Django 开发指南
项目创建
# 创建名为 django01的项目
django-admin startproject django01
# 进入项目目录
cd django01
# 创建名为 app01的app
django-admin startapp app01
# 启动django项目,默认127.0.0.1:8000
python manage.py runserver
# 指定端口启动
python manage.py runserver 8002
# 创建超级管理员
python manage.py createsuperuser
Setting.py
文档:
https://docs.djangoproject.com/zh-hans/3.2/ref/settings/
服务主机
# 安全措施,允许哪些地址访问
DEBUG = True # 生产环境要禁用
ALLOWED_HOSTS = [] # 默认。DEBUG=True时,根据['.localhost', '127.0.0.1', '[::1]']验证
ALLOWED_HOSTS = ['*'] # 允许所有
ALLOWED_HOSTS = ['127.0.0.1', '192.168.1.10'] # 允许指定ip访问
注册app
INSTALLED_APPS = [
...
'apps.app01.App01Config', # 新加
'apps.user.UserConfig' # 新加
]
Mysql数据库
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'db_name',
'USER': 'root',
'PASSWORD': '123456',
'HOST': '127.0.0.1',
'PORT': 3306
},
...
}
Pymysql
- 推荐使用mysqlclient
pip install pymysql
settings.py 同级__init__.py
import pymysql
pymysql.install_as_MySQLdb()
Mysqlcilent
pip install mysqlclient
缓存
pip install django-redis
settings.py
CACHES = {
# 默认
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': '',
'TIMEOUT': 300, # 缓存超时时间,单位秒。None:永久不过期
'OPTIONS': {
'MAX_ENTRIES': 300, # 允许缓存最大条目
'CULL_FREQUENCY': 3, # 缓存条目最大时,删除条目的比例。1:全部;2:1/2;3:1/3
},
},
# 附加缓存
'session': {
'BACKEND': 'django_redis.cache.RedisCache', # 缓存redis
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
# 'PASSWORD': ''
}
},
# 购物车
'cart': {
'BACKEND': 'django_redis.cache.RedisCache', # 缓存redis
'LOCATION': 'redis://127.0.0.1:6379/2',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
# 'PASSWORD': ''
}
},
}
views.py
from django_redis import get_redis_connection
conn = get_redis_connection('cart')
print(conn.hgetall('xxx'))
HTML模板目录
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')] # 指定html文件所在的位置目录templates
,
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
语言时区
LANGUAGE_CODE = 'zh-hans' # 中文显示
TIME_ZONE = 'Asia/Shanghai' # 设置时区
USE_I18N = True # 国际化
USE_L10N = True # admin后台显示时间格式化为 '2020-09-12 12:00:00'
USE_TZ = False # ORM查询返回,是否自动转换为UTC时间
静态文件
# 静态文件夹的别名
STATIC_URL = '/static/'
# 所有静态文件(css/js/图片)都放在我下面你配置的文件夹中
STATICFILES_DIRS = [
os.path.join(BASE_DIR, "static"),
]
Session
# 数据库Session(默认)
SESSION_ENGINE = 'django.contrib.sessions.backends.db' # 存储数据库
# 缓存Session
SESSION_ENGINE = 'django.contrib.sessions.backends.cache' # 指定缓存存储
SESSION_CACHE_ALIAS = 'default' # 使用的缓存别名(默认内存缓存,也可以是memcache),此处别名依赖缓存的设置
# 文件Session
SESSION_ENGINE = 'django.contrib.sessions.backends.file' # 存储文件
SESSION_FILE_PATH = None # 缓存文件路径,如果为None,则使用tempfile模块获取一个临时地址tempfile.gettempdir()
# 缓存+数据库
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db' # 引擎
# 加密Cookie Session
SESSION_ENGINE = 'django.contrib.sessions.backends.signed_cookies' # 引擎
# 其他公用设置项:
SESSION_COOKIE_NAME = "sessionid" # Session的cookie保存在浏览器上时的key,即:sessionid=随机字符串(默认)
SESSION_COOKIE_PATH = "/" # Session的cookie保存的路径。默认 /
SESSION_COOKIE_DOMAIN = None # Session的cookie保存的域名。默认 None
SESSION_COOKIE_SECURE = False # 是否Https传输cookie。默认 False
SESSION_COOKIE_HTTPONLY = True # 是否Session的cookie只支持http传输。默认 True
SESSION_COOKIE_AGE = 1209600 # session的cookie失效日期,单位秒,默认2周
SESSION_EXPIRE_AT_BROWSER_CLOSE = False # 是否关闭浏览器使得Session过期。默认
SESSION_SAVE_EVERY_REQUEST = False # 是否每次请求都保存Session,默认修改之后才保存。默认
日志
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {message} ',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'handlers': {
'sql': {
'level': 'DEBUG',
'class': 'logging.FileHandler',
'filename': os.path.join(BASE_DIR, "logs/sql.log"),
'formatter': 'verbose'
},
'error': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': os.path.join(BASE_DIR, "logs/error.log"),
'formatter': 'verbose'
},
},
'loggers': {
'django': {
'handlers': ['error'],
'level': 'DEBUG',
'propagate': True,
},
'error': {
'handlers': ['error'],
'level': 'DEBUG',
'propagate': True,
},
'django.db.backends': {
'handlers': ['sql'],
'level': 'DEBUG',
'propagate': True,
},
},
}
Simpleui 后台
官网教程:https://simpleui.72wo.com/docs/simpleui/doc.html
pip install django-simpleui
settings.py 加入
INSTALLED_APPS = [
'simpleui',
...
]
环境打包
# 打包环境
pip freeze > requirements.txt
# 下载环境
pip install -r requirements.txt
Model 模型
- 文档:https://docs.djangoproject.com/zh-hans/3.2/ref/models/fields/
class Author(models.Model):
GENDERS = (
(0, '男'),
(1, '女')
)
name = models.CharField(max_length=20, verbose_name='名字', db_column='name')
gender = models.IntegerField(default=0, choices=GENDERS, verbose_name='性别')
created = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated = models.DateTimeField(auto_now=True, verbose_name='更新时间')
deleted = models.IntegerField(default=0, verbose_name='逻辑删除')
class Meta:
# managed = False
db_table = 'author'
app_label = 'demo'
# 逻辑删除
def delete(self, using=None, keep_parents=False):
self.deleted = 1
self.save()
class Book(models.Model):
name = models.CharField(max_length=30, default='', verbose_name='书名')
author = models.ForeignKey(Author, on_delete=models.CASCADE, db_constraint=False, related_name='作者') # db_constraint=False:逻辑外键。数据库不使用外键,仅django内部使用
price = models.IntegerField(default=0, verbose_name='价格')
seq = models.IntegerField(default=0, verbose_name='顺序')
created = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated = models.DateTimeField(auto_now=True, verbose_name='更新时间')
deleted = models.IntegerField(default=0, verbose_name='逻辑删除')
class Meta:
# managed = False
db_table = 'book'
app_label = 'demo'
索引
class Meta:
# 索引
indexes = [
models.Index(fields=['project']),
]
# 联合索引
index_together = [["pub_date", "deadline"]]
# index_together = ["pub_date", "deadline"] # 单字段可以一个列表
# 联合唯一索引
unique_together = [['driver', 'restaurant']]
# unique_together = ['driver', 'restaurant']
DateTimeField
精度默认为 6,修改为0 需要将相对配置 datetime(6) 改为 datetime
## settings.py
from django.db.backends.mysql.base import DatabaseWrapper
DatabaseWrapper.data_types['DateTimeField'] = 'datetime'
模型逆向生成
python manage.py inspectdb > apps/models.py
# 指定表生成
python manage.py inspectdb tb > apps/models.py
清理migrations
python manage.py makemigrations # 确保 migration文件和数据库同步
python manage.py showmigrations # 看文件是否都迁移成功。[X]表示迁移成功
python manage.py migrate --fake app名 zero # 清除迁移历史记录
python manage.py makemigrations # 再次生成迁移文件
python manage.py migrate --fake-initial # 执行迁移,但不会真的执行
多数据库
settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': '',
'USER': '',
'PASSWORD': '',
'HOST': 'localhost',
'PORT': 3306
},
'db02': {
'ENGINE': 'django.db.backends.mysql',
'NAME': '',
'USER': '',
'PASSWORD': '',
'HOST': 'localhost',
'PORT': 3306
}
}
# 数据库路由文件地址
DATABASE_ROUTERS = ['fresh.database_router.DatabaseAppsRouter']
# 配置app对应的路由表
DATABASE_APPS_MAPPING = {
'app01': 'default',
'app02': 'db02',
'app03': 'db02',
}
创建数据库路由规则
from django.conf import settings
DATABASE_MAPPING = settings.DATABASE_APPS_MAPPING
class DatabaseAppsRouter:
"""
A router to control all database operations on models for different
databases.
In case an app is not set in settings.DATABASE_APPS_MAPPING, the router
will fallback to the `default` database.
Settings example:
DATABASE_APPS_MAPPING = {'app1': 'db1', 'app2': 'db2'}
"""
def db_for_read(self, model, **hints):
""""Point all read operations to the specific database."""
if model._meta.app_label in DATABASE_MAPPING:
return DATABASE_MAPPING[model._meta.app_label]
return None
def db_for_write(self, model, **hints):
"""Point all write operations to the specific database."""
if model._meta.app_label in DATABASE_MAPPING:
return DATABASE_MAPPING[model._meta.app_label]
return None
def allow_relation(self, obj1, obj2, **hints):
"""Allow any relation between apps that use the same database."""
db_obj1 = DATABASE_MAPPING.get(obj1._meta.app_label)
db_obj2 = DATABASE_MAPPING.get(obj2._meta.app_label)
if db_obj1 and db_obj2:
if db_obj1 == db_obj2:
return True
else:
return False
return None
def allow_syncdb(self, db, model):
"""Make sure that apps only appear in the related database."""
if db in DATABASE_MAPPING.values():
return DATABASE_MAPPING.get(model._meta.app_label) == db
elif model._meta.app_label in DATABASE_MAPPING:
return False
return None
def allow_migrate(self, db, app_label, model=None, **hints):
"""
Make sure the auth app only appears in the 'auth_db'
database.
"""
if db in DATABASE_MAPPING.values():
return DATABASE_MAPPING.get(app_label) == db
elif app_label in DATABASE_MAPPING:
return False
return None
创建model
from django.db import models
class Author(models.Model):
name = models.CharField(max_length=20)
created = models.DateTimeField()
updated = models.DateTimeField()
deleted = models.IntegerField()
class Meta:
db_table = 'author'
app_label = 'demo'
生成数据表
python manage.py makemigrations
python manage.py migrate --database=db02
跨域解决
方案一:禁用跨域校验(开发模式可以用)
MIDDLEWARE = [
# 'django.middleware.csrf.CsrfViewMiddleware', # 注释这行,会允许所有跨域请求
]
方案二:cors扩展(推荐)
pip install django-cors-headers
setting.py
# 配置应用
INSTALLED_APPS = [
...
'corsheaders',
...
]
# 中间层设置
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware', # 注意顺序放首位
...
]
CORS_ALLOW_CREDENTIALS = True # 允许携带cookie
# CORS_ORIGIN_ALLOW_ALL = True # 允许所有主机跨域
# 允许跨域白名单
CORS_ORIGIN_WHITELIST = (
'127.0.0.1:8080',
...
)
# 允许跨域请求方法
CORS_ALLOW_METHODS = (
'GET',
'POST',
'PUT',
'PATCH',
'DELETE',
'OPTIONS',
'VIEW',
)
# 允许跨域请求头
CORS_ALLOW_HEADERS = (
'XMLHttpRequest',
'X_FILENAME',
'accept-encoding',
'authorization',
'content-type',
'dnt',
'origin',
'user-agent',
'x-csrftoken',
'x-requested-with',
'Pragma',
)
Session 会话
全局配置
# setting.py
# 缓存session
CACHES = {
...
'session': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
# 'PASSWORD': ''
}
}
}
# Session配置
SESSION_ENGINE = 'django.contrib.sessions.backends.cache' # 指定redis存储
SESSION_CACHE_ALIAS = 'session' # 使用的缓存别名
SESSION_COOKIE_AGE = 30 # 过期时间,单位秒。默认2周
REST_FRAMEWORK = {
# 自定义认证
'DEFAULT_AUTHENTICATION_CLASSES': (
'utils.custom_authentication.CustomJWTAuth',
),
}
命令
request.session.session_key # 获取sessionid
request.session.exists('sessionid') # 判断获取sessionid是否存在
request.session['key'] = 'value' # 设置session
request.session.setdefault('key', 'value') # 不存在则设置
request.session.get('key') # 获取session,默认None
del request.session['key'] # 删除session指定键值对
request.session.delete() # 删除当前会话数据和数据库数据
request.session.clear() # 清除session,删除存储session值
request.session.flush() # 清除session,删除当前会话cookie和存储session(常用)
request.session.keys() # 获取所有键
request.session.values() # 获取所有值
request.session.items() # 获取所有键值对
request.session.iterkeys() #
request.session.itervalues() #
request.session.iteritems() #
request.session.clear_expired() # 删除所有失效日期小于当前日期的session
request.session.set_expiry(value) # 设置session、cookie有效期
# 整数:指定秒后过期;
# datatime或timedelta:指定时间过期;
# 0:用户关闭浏览器时过期
Redis
Transaction 事务
from django.db import transaction
# 装饰器用法
@transaction.atomic
def func(request):
# 函数下所有代码会在一个事务中执行
pass
# with用法,更灵活
def func(request):
# 这部分代码不在事务中,会被 Django 自动提交
pass
with transaction.atomic():
# 这部分代码会在事务中执行
s1 = transaction.savepoint() # 设置保存点,可设置多个
transaction.savepoint_rollback(s1) # 事务回滚至指定保存点
transaction.savepoint_commit(s1) # 提交事务
pass
ORM
查询条件
# 使用方式,字段__条件
exact # 精准 =
iexact # 精准 =,忽略大小写
contains # 包含
icontains # 包含,忽略大小写
in # 在容器中
gt # 大于 >
gte # 大于等于 >=
lt # 小于 <
lte # 小于等于 <=
startswith # 字段以给定值开始
istartswith # 字段以给定值开始,忽略大小写
endswith # 字段以给定值结束
iendswith # 字段以给定值结束,忽略大小写
range # 用于指定容器范围,并查询容器范围的数据
date # 针对某些date或者datetime类型的字段,根据date部分进行过滤(年月日)
time # 根据时间来进行查询:比较日期时间(datetime)字段的时间部分
year # 根据年份进行查询
isnull # 对应SQL语句中的IS NULL和IS NOT NULL,可接受参数为True和False
regex # 根据指定的正则表达式来查询
iregex # 根据指定的正则表达式来查询,忽略大小写
排序
from django.db.models import F
# name字段为null的,排最前面
.order_by(F('name').asc(nulls_first=True), 'create_time')
# name字段为null的,排最后边
.order_by(F('name').asc(nulls_last=True), 'create_time')
# 查不到返回404
get_object_or_404()
DRF 框架
安装DRF
pip install djangorestframework # drf框架
pip install django-filter # 过滤器
setting.py 配置
INSTALLED_APPS = [
...
'rest_framework',
'django_filters',
'apps.book',
...
]
...
REST_FRAMEWORK = {
# 返回时间格式
'DATETIME_FORMAT': "%Y-%m-%d %H:%M:%S",
# 自定义异常响应格式
'EXCEPTION_HANDLER': 'utils.custom_exception.custom_exception_handler',
# # 认证
# 'DEFAULT_AUTHENTICATION_CLASSES': (
# 'rest_framework.authentication.BasicAuthentication',
# 'rest_framework.authentication.SessionAuthentication',
# ),
# # 权限
# 'DEFAULT_PERMISSION_CLASSES': (
# 'rest_framework.permissions.IsAuthenticated',
# ),
# 限流
'DEFAULT_THROTTLE_CLASSES': (
'rest_framework.throttling.AnonRateThrottle', # 游客
'rest_framework.throttling.UserRateThrottle' # 用户
),
'DEFAULT_THROTTLE_RATES': {
'anon': '100/day',
'user': '10/second'
},
# 过滤
'DEFAULT_FILTER_BACKENDS': ('django_filters.rest_framework.DjangoFilterBackend',)
}
DRF的基本使用
models.py
from django.db import models
class Author(models.Model):
GENDERS = (
(0, '男'),
(1, '女')
)
name = models.CharField(max_length=20, verbose_name='名字')
gender = models.IntegerField(default=0, choices=GENDERS, verbose_name='性别')
created = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated = models.DateTimeField(auto_now=True, verbose_name='更新时间')
deleted = models.IntegerField(default=0, verbose_name='逻辑删除')
class Meta:
# managed = False
db_table = 'author'
# 逻辑删除
def delete(self, using=None, keep_parents=False):
self.deleted = 1
self.save()
class Book(models.Model):
name = models.CharField(max_length=30, default='', verbose_name='书名')
author = models.ForeignKey(Author, on_delete=models.CASCADE, db_constraint=False,
related_name='作者') # db_constraint=False:数据库不使用外键,仅django内部使用
price = models.IntegerField(default=0, verbose_name='价格')
seq = models.IntegerField(default=0, verbose_name='顺序')
created = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated = models.DateTimeField(auto_now=True, verbose_name='更新时间')
deleted = models.IntegerField(default=0, verbose_name='逻辑删除')
class Meta:
# managed = False
db_table = 'book'
urls.py
from django.urls import path
from rest_framework import routers
from apps.book.views import BookView
urlpatterns = [
# path('books', BookView.as_view({'get': 'list'})),
]
# trailing_slash为False,url不自动添加尾斜杠
router = routers.SimpleRouter(trailing_slash=False)
router.register(r'author', AuthorView, basename='author')
router.register(r'book', BookView2, basename='book')
urlpatterns += router.urls
serializer.py
from rest_framework import serializers
from apps.demo.models import Book, Author
class AuthorSerializer(serializers.ModelSerializer):
gender = serializers.CharField(source='get_gender_display', read_only=True, required=False)
class Meta:
model = Author
fields = '__all__' # 返回字段,__all__ 代表所有
class BookSerializer(serializers.ModelSerializer):
"""
read_only:只在查询时有效
write_only:只在写入时有效
"""
author = serializers.CharField(label='作者', source='author_id.name', read_only=True)
# author_id = serializers.PrimaryKeyRelatedField(label='作者', read_only=True)
class Meta:
model = Book
fields = '__all__' # 返回字段,__all__ 代表所有
views.py
class AuthorView(ModelViewSet):
"""
新增:POST /author
删除:DELETE /author/1
修改:PATCH /author/1
作者列表查询:GET /author
作者图书查询:GET /author/{pk}/book
"""
queryset = Author.objects.filter(deleted=0)
serializer_class = AuthorSerializer
@action(methods=['get'], detail=True, serializer_class=BookSerializer)
def book(self, request, pk=None, *args, **kwargs):
self.queryset = Book.objects.filter(deleted=0, author=pk)
return self.list(self, request, *args, **kwargs)
class BookView(APIView):
"""
新增:POST /book
"""
def post(self, request, *args, **kwargs):
# 创建或更新
name = request.data.pop('name')
res, created = Book.objects.update_or_create(defaults=request.data, name=name)
ser = BookSerializer(res)
return Response(ser.data)
Serializer.py 序列化器
**序列化:**对查询结果处理
**反序列化:**对接收参数处理
from rest_framework import serializers
from apps.book.models import Book
class BookSerializer(serializers.ModelSerializer):
"""
read_only:只在查询时有效
write_only:只在写入时有效
"""
name = serializers.CharField(max_length=30)
author = serializers.CharField(max_length=30)
price = serializers.IntegerField(required=False)
seq = serializers.IntegerField(required=False)
author = serializers.CharField(read_only=True, source='author.name') # 序列化外键字段
class Meta:
model = Book
fields = '__all__' # 返回字段,__all__ 代表所有
exclude = ('seq',) # 除了seq 字段,其他都序列化
# 指定字段说明
extra_kwargs = {
'name': {'write_only': True},
'author': {'write_only': True},
'price': {'write_only': True}
}
# 改变序列输出行为
def to_representation(self, instance):
ret = super().to_representation(instance)
# json字符串 => 字典
if 'field' in ret:
try:
ret['field'] = json.loads(ret['field'])
except ValueError:
pass
return ret
view.py
class BookView(APIView):
def post(self, request, *args, **kwargs):
# 创建或更新
name = request.data.pop('name')
res, created = Book.objects.update_or_create(defaults=request.data, name=name)
ser = BookSerializer(res)
return Response(ser.data)
BookSerializer(res, many)
嵌套序列化
## models.py
from django.db import models
class Author(models.Model):
name = models.CharField(max_length=20)
class Meta:
# managed = False
db_table = 'author'
app_label = 'demo'
class Book(models.Model):
name = models.CharField(max_length=30)
author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name='books', db_constraint=False) # db_constraint=False:数据库不使用外键,仅django内部使用
class Meta:
# managed = False
db_table = 'book'
app_label = 'demo'
常规嵌套
# 书籍嵌套作者
class AuthorSerializer(serializers.ModelSerializer):
class Meta:
model = Author
fields = '__all__'
class BookSerializer(serializers.ModelSerializer):
author = AuthorSerializer(read_only=True, source='author')
class Meta:
model = Book
fields = '__all__'
# 作者嵌套书籍
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = Book
fields = '__all__'
class AuthorSerializer(serializers.ModelSerializer):
books = AuthorSerializer(read_only=True, source='books', many=True)
class Meta:
model = Author
fields = '__all__'
从下往上查
## serializer.py
from rest_framework import serializers
# 底层往上查
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = Book
fields = '__all__'
depth = 2 # 递归深度
从上往下查
## serializer.py
from rest_framework import serializers
class BookSerializer(serializers.ModelSerializer):
# 多对一序列化。author_id:外键字段名
author = serializers.CharField(label='作者', source='author_id.name', read_only=True)
#image_list = ProductImageSerializer(source='product_info', read_only=True, many=True)
class Meta:
model = Book
fields = '__all__'
多级嵌套
## models.py
class Area(models.Model):
p_id = models.ForeignKey('self', db_column='p_id', db_constraint=False, on_delete=models.SET_DEFAULT, default=0, verbose_name='地区', related_name='children')
name = models.CharField(max_length=50, verbose_name='地区名')
class Meta:
db_table = 'area'
## serializer.py
from rest_framework import serializers
class AreaSerializer3(serializers.ModelSerializer):
class Meta:
model = Area
fields = '__all__'
class AreaSerializer2(serializers.ModelSerializer):
children = AreaSerializer3(many=True, read_only=True)
class Meta:
model = Area
fields = '__all__'
class AreaSerializer(serializers.ModelSerializer):
children = AreaSerializer2(many=True, read_only=True)
class Meta:
model = Area
fields = '__all__'
## views.py
from rest_framework import mixins
from rest_framework.viewsets import GenericViewSet
class AreaView(GenericViewSet, mixins.ListModelMixin):
queryset = Area.objects.filter(pid=0)
serializer_class = AreaSerializer
递归嵌套
## models.py
class Area(models.Model):
p_id = models.ForeignKey('self', null=True, db_column='p_id', db_constraint=False, on_delete=models.CASCADE, verbose_name='地区')
name = models.CharField(max_length=50, verbose_name='地区名')
class Meta:
db_table = 'area'
## serializer.py
from rest_framework import serializers
class AreaSerializer(serializers.ModelSerializer):
children = serializers.SerializerMethodField()
class Meta:
model = Area
fields = '__all__'
def get_children(self, obj)
res = Area.objects.filter(p_id=obj.id)
return AreaSerializer(res, many=True).data
## views.py
from rest_framework import mixins
from rest_framework.viewsets import GenericViewSet
class AreaView(GenericViewSet, mixins.ListModelMixin):
queryset = Area.objects.filter(pid=None)
serializer_class = AreaSerializer
ChoiceDield
class User(models.Model):
GENDERS = (
(0, '男'),
(1, '女'),
)
gender = models.IntegerField(default=0, choices=GENDERS)
class UserSerializer(serializers.ModelSerializer):
# 使用 get_<字段名>_display的方法
gender = serializers.CharField(source='get_gender_display', required=False)
class Meta:
model = User
fields = '__all__'
获取数据
## views.py
# 传参
self.kwargs['user_id'] = 123
## serializer.py
# 取参
self.context.get('view').kwargs
序列化器调用
# 创建
ser = BookSerializer(data)
if ser.is_valid():
ser.save()
# 更新
instance = Book.objects.filter(id=1).first()
ser = BookSerializer(instance, data)
# ser = BookSerializer(instance=instance, data=data, partial=True) # 部分更新
if ser.is_valid():
ser.save()
重写方法
##views.py
class OrderView(GenericAPIView, mixins.CreateModelMixin):
authentication_classes = [FreshSessionAuth]
queryset = OrderMaster.objects.filter(deleted=0)
serializer_class = OrderMasterSerializer
def post(self, request, *args, **kwargs):
self.kwargs['user_id'] = 123
return self.create(request, *args, **kwargs)
##serializer.py
class OrderMasterSerializer(serializers.ModelSerializer):
class Meta:
model = OrderMaster
fields = '__all__'
def create(self, validated_data):
user_id = self.context.get('view').kwargs.get('user_id')
validated_data['user_id'] = user_id
return super().create(validated_data)
# return OrderMaster.objects.create(**validated_data)
def update(self, instance, validated_data):
if validated_data.get('pay_state') == 1:
validated_data['order_state'] = 1
super().update(instance, validated_data)
return instance
Filters.py 过滤器
## filters.py
from django_filters import filters
from django_filters.rest_framework import FilterSet
from apps.book.models import Book
class BookFilter(FilterSet):
"""
field_name:指定字段,默认同名
lookup_expr:匹配方式,orm同款
"""
name = filters.CharFilter(field_name='name', lookup_expr='icontains')
class Meta:
model = Book
fields = ['is_delete', 'name'] # 过滤条件, 精准匹配
# or
fields = {
'is_delete': ['exact', 'contains'],
'name': ['exact', 'contains'],
}
Pages.py 分页器
from rest_framework.pagination import PageNumberPagination
class LargePagination(PageNumberPagination):
page_size = 100 # 每页显示数量
page_size_query_param = 'page_size' # 分页控件的查询参数的名称
max_page_size = 10000 # 允许的最大页面大小。该属性仅在 page_size_query_param 也被设置时有效
class StandardPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 1000
APIView类视图
from rest_framework.views import APIView
class BookView(APIView):
renderer_classes = [] # 自定义过滤器
authentication_classes = [] # 自定义认证
throttle_classes = [] # 自定义限流
permission_classes = [] # 自定义权限
def get(self, request):
pass
def post(self, request):
pass
ModelViewSet 视图集
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.authentication import SessionAuthentication, BasicAuthentication
from rest_framework.filters import SearchFilter, OrderingFilter
from rest_framework.pagination import PageNumberPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import ModelViewSet
from apps.book.models import Book
from apps.book.filters import BookFilter
from apps.book.pages import LargeSetPagination
from apps.book.serializers import BookSerializer
class BookView(ModelViewSet):
# renderer_classes = [] # 自定义过滤器
# authentication_classes = [] # 自定义认证
# throttle_classes = [] # 自定义限流
# permission_classes = [] # 自定义权限
# http_method_names = ['get', 'post',] # 允许请求方式
queryset = Book.objects.all()
serializer_class = BookSerializer
# pagination_class = LargeSetPagination # 自定义分页器
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
# 过滤方案一 filter_class 自定义高级过滤规则
# filter_class = BookFilter # 与filter_fields相斥,需配置DjangoFilterBackend
# 过滤方案二 filter_fields 精准查询
# filter_backends = [DjangoFilterBackend] # 全局配置过滤器后 无需配置此项
filter_fields = ['name'] # url中关键字结合列表内字段各自精准匹配
# 过滤 模糊查询
# filter_backends = [SearchFilter] # 设置过滤后端
search_fields = ['name', 'Author__name'] # url中的关键字为search,列表内所有字段内做模糊匹配。匹配外键字段是需写:外键名__外键字段
# # 排序
# filter_backends = [OrderingFilter] # 设置排序后端
ordering_fields = ['seq', 'id'] # 可排序字段,倒序需在url参数后字段前面加 -
ordering = ['seq'] # 默认排序
# 分页
# pagination_class = None # 关闭分页。全局配置后进行关闭
def retrieve(self, request, *args, **kwargs):
print('retrieve')
return super().retrieve(request, *args, **kwargs)
def list(self, request, *args, **kwargs):
print('list')
return super().list(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
return super().create(request, *args, **kwargs)
请求示例:
http://127.0.0.1/book?search=王子
http://127.0.0.1/book?name=小王子&is_delete=0
http://127.0.0.1/book?ordering=seq,-id
批量创建
class BookView(ModelViewSet):
queryset = Book.objects.all()
serializer_class = BookSerializer
http_method_names = ['post',]
def get_serializer(self, *args, **kwargs):
if 'data' in kwargs:
if isinstance(kwargs['data'], list):
kwargs['many'] = True
return super().get_serializer(*args, **kwargs)
自定义路由
urls.py
urlpatterns = [
# 额外行为,单独定义路由。或使用 @action注册配合路由器自动生成
# path('books/latest', BookView.as_view({'get': 'latest'})),
]
router = routers.SimpleRouter(trailing_slash=False)
router.register(r'books', BookView)
urlpatterns += router.urls
views.py
from rest_framework.decorators import action
from rest_framework.viewsets import ModelViewSet
class BookView(ModelViewSet):
queryset = Book.objects.filter(is_delete=0)
serializer_class = BookSerializer
@action(methods=['get'], detail=False) # 使路由器自动生成该路由。detail=False表示不需要pk
def latest(self, request, *args, **kwargs):
book = Book.objects.latest('id')
ser = self.get_serializer(book)
return Response(ser.data)
@action(methods=['get'], detail=True) # 使路由器自动生成该路由。detail=True表示需要pk,路由为 books/{pk}/lastst
def latest(self, request, *args, **kwargs):
pk = kwargs['pk']
book = Book.objects.latest('id')
ser = self.get_serializer(book)
return Response(ser.data)
版本控制
全局配置
## settings.py
REST_FRAMEWORK = {
'DEFAULT_VERSIONING_CLASS': 'rest_framework.versioning.NamespaceVersioning',
'DEFAULT_VERSION': 'v1', # 默认版本(从request对象里取不到,显示的默认值)
'ALLOWED_VERSIONS': ['v1', 'v2'], # 允许的版本,与接口文档冲突
'VERSION_PARAM': 'version' # URL中获取值的key
}
## urls.py
path(r'v1/api/', include(('apps.urls', "apps"), namespace="v1")),
path(r'v2/api/', include(('apps.urls', "apps"), namespace="v2")),
自定义异常类响应码
custom_exceptions.py
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import APIException
class HaveExisted(APIException):
"""资源已存在"""
status_code = 409
default_detail = _('Have Existed.')
default_code = 'have_existed'
自定义异常格式
全局配置
## settings.py
REST_FRAMEWORK = {
# 自定义异常格式
'EXCEPTION_HANDLER': 'utils.custom_exception.custom_exception_handler',
}
custom_response_exception.py
"""
自定义异常响应格式:{'code':0,'message':'成功','data':[]}
"""
from rest_framework.response import Response
from rest_framework.views import exception_handler
def custom_exception_handler(exc, context):
response = exception_handler(exc, context)
res = {}
if response is None:
res = {
'code': 9999,
'message': '服务器未知错误'
}
return response
else:
data = []
# print('exception', response.status_code, response.data)
res['code'] = response.status_code
if isinstance(response.data, dict):
data = dict(response.data)
if response.status_code >= 500:
res['message'] = '服务器错误'
elif response.status_code == 404:
res['message'] = '未找到'
elif response.status_code == 400:
res['message'] = '参数错误'
elif response.status_code == 401:
res['message'] = '未认证'
elif response.status_code == 403:
res['message'] = '权限不允许'
elif response.status_code == 405:
res['message'] = '方法不允许'
elif response.status_code == 409:
res['message'] = '资源已存在'
else:
res['message'] = '未知错误'
res['data'] = data
return Response(res)
自定义响应格式
全局配置
# setting.py
REST_FRAMEWORK = {
# 自定义响应格式
'DEFAULT_RENDERER_CLASSES': [
'utils.custom_response.CustomRenderer'
],
}
custom_response.py
"""
自定义返回处理格式:{'code':0,'message':'成功','data':[]}
"""
from rest_framework.renderers import JSONRenderer
class CustomRenderer(JSONRenderer):
# 重构render方法
def render(self, data, accepted_media_type=None, renderer_context=None):
print('renderer', data)
ret = {}
if renderer_context:
# 如果返回的data为字典
if isinstance(data, dict) and data.get('code') and data.get('message'):
'''
若响应信息中已有code、message,则pass返回
'''
ret = data
elif isinstance(data, dict) and data.get('count') and data.get('results'):
'''
若响应信息中已有count、results,则为分页列表消息,对其格式化
'''
# ret['code'] = data.pop('code', renderer_context["response"].status_code)
ret['code'] = 0
ret['message'] = data.pop('message', '成功')
ret['count'] = data['count']
ret['next'] = data['next']
ret['previous'] = data['previous']
ret['data'] = data['results']
else:
'''
若响应信息不是字典,则直接格式化返回
'''
# ret['code'] = renderer_context["response"].status_code
ret['code'] = 0
ret['message'] = '成功'
ret['data'] = data
renderer_context["response"]['status_code'] = 200
return super().render(ret, accepted_media_type, renderer_context)
else:
return super().render(data, accepted_media_type, renderer_context)
自定义认证
Session
全局配置
# setting.py
REST_FRAMEWORK = {
# 自定义认证
'DEFAULT_AUTHENTICATION_CLASSES': (
'utils.custom_authentication.CustomSessionAuth',
),
}
局部使用
class BookView(APIView):
authentication_classes = [CustomSessionAuth]
保存session
request.session['id'] = 123
request.session['username'] = 'abc'
验证
class CustomSessionAuth(BaseAuthentication):
def authenticate(self, request):
print('session', request.session)
session_key = request.session.session_key
if not session_key:
raise AuthenticationFailed('未认证')
if not request.session.get('id'):
raise AuthenticationFailed('认证失败')
return None
JWT Token
全局配置
# setting.py
REST_FRAMEWORK = {
# 自定义认证
'DEFAULT_AUTHENTICATION_CLASSES': (
'utils.custom_authentication.CustomJWTAuth',
),
}
局部使用
class BookView(APIView):
authentication_classes = [CustomJWTAuth]
生成 jwt token
import datetime
import jwt
def create_token(payload):
print(type(payload))
t = datetime.datetime.now()
key = 'lp'
payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=600)
token = jwt.encode(payload=payload,
key=key,
algorithm='HS256')
# print('create_token ', token)
return token
custom_authentication.py
import jwt
from jwt import exceptions
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from apps.user.models import UserWechat
class CustomJWTAuth(BaseAuthentication):
def authenticate(self, request):
token = request.META.get('HTTP_Authorization'.upper())
key = 'lp'
payload = None
if token is None:
raise AuthenticationFailed('未认证')
try:
payload = jwt.decode(token, key, algorithms=['HS256'])
# print('validate_token ', type(payload), payload)
except exceptions.ExpiredSignatureError:
raise AuthenticationFailed('签名已失效')
except jwt.DecodeError:
raise AuthenticationFailed('认证失败')
except jwt.InvalidTokenError:
raise AuthenticationFailed('签名非法')
return None
自定义权限
全局配置
'DEFAULT_PERMISSION_CLASSES': (
'utils.custom_permission.CustomPermission',
),
custom_permission.py
from rest_framework.permissions import BasePermission
class CustomPermission(BasePermission):
message = '没有权限'
def has_permission(self, request, view):
print(request.user)
print(view)
if 1:
return True
else:
return None
自定义限流
全局配置
# 限流
'DEFAULT_THROTTLE_CLASSES': (
'utils.custom_throttle.CustomAnonRateThrottle', # 自定义游客
'utils.custom_throttle.CustomUserRateThrottle', # 自定义用户
),
'DEFAULT_THROTTLE_RATES': {
'anon': '2/day',
'user': '4/day'
},
custom_throttle.py
from rest_framework.throttling import SimpleRateThrottle
class CustomAnonRateThrottle(SimpleRateThrottle):
scope = 'anon'
def get_cache_key(self, request, view):
# print('限流Anon', str(request.user), type(request.user))
if str(request.user) != 'AnonymousUser': # 若不是匿名用户
return None # Only throttle unauthenticated requests.
return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
class CustomUserRateThrottle(SimpleRateThrottle):
scope = 'user'
def get_cache_key(self, request, view):
# print('限流User', request.user)
if str(request.user) != 'AnonymousUser': # 若不是匿名用户
ident = request.user.id
else:
ident = self.get_ident(request)
return self.cache_format % {
'scope': self.scope,
'ident': ident
}
DRF 状态码
"""
Descriptive HTTP status codes, for code readability.
See RFC 2616 - https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
And RFC 6585 - https://tools.ietf.org/html/rfc6585
And RFC 4918 - https://tools.ietf.org/html/rfc4918
"""
def is_informational(code):
return 100 <= code <= 199
def is_success(code): # 成功
return 200 <= code <= 299
def is_redirect(code): # 重定向
return 300 <= code <= 399
def is_client_error(code): # 请求错误
return 400 <= code <= 499
def is_server_error(code): # 服务器错误
return 500 <= code <= 599
HTTP_100_CONTINUE = 100 # 继续
HTTP_101_SWITCHING_PROTOCOLS = 101 # 切换协议
HTTP_200_OK = 200 # 正常
HTTP_201_CREATED = 201 # 已创建
HTTP_202_ACCEPTED = 202 # 已接受
HTTP_203_NON_AUTHORITATIVE_INFORMATION = 203 # 非授权信息
HTTP_204_NO_CONTENT = 204 # 无内容
HTTP_205_RESET_CONTENT = 205 # 重置内容
HTTP_206_PARTIAL_CONTENT = 206 # 部分内容
HTTP_207_MULTI_STATUS = 207 # 多状态
HTTP_208_ALREADY_REPORTED = 208 # 已报告
HTTP_226_IM_USED = 226 # 正在使用
HTTP_300_MULTIPLE_CHOICES = 300 # 多选择
HTTP_301_MOVED_PERMANENTLY = 301 # 永久重定向
HTTP_302_FOUND = 302 # 临时重定向
HTTP_303_SEE_OTHER = 303 # 分配新地址
HTTP_304_NOT_MODIFIED = 304 #
HTTP_305_USE_PROXY = 305 #
HTTP_306_RESERVED = 306 #
HTTP_307_TEMPORARY_REDIRECT = 307 # 临时重定向2
HTTP_308_PERMANENT_REDIRECT = 308 #
HTTP_400_BAD_REQUEST = 400 # 语法错误
HTTP_401_UNAUTHORIZED = 401 # 未认证
HTTP_402_PAYMENT_REQUIRED = 402 #
HTTP_403_FORBIDDEN = 403 # 无权限
HTTP_404_NOT_FOUND = 404 # 未找到
HTTP_405_METHOD_NOT_ALLOWED = 405 # 方法不允许
HTTP_406_NOT_ACCEPTABLE = 406 #
HTTP_407_PROXY_AUTHENTICATION_REQUIRED = 407 #
HTTP_408_REQUEST_TIMEOUT = 408 #
HTTP_409_CONFLICT = 409 #
HTTP_410_GONE = 410 #
HTTP_411_LENGTH_REQUIRED = 411 #
HTTP_412_PRECONDITION_FAILED = 412 #
HTTP_413_REQUEST_ENTITY_TOO_LARGE = 413 #
HTTP_414_REQUEST_URI_TOO_LONG = 414 #
HTTP_415_UNSUPPORTED_MEDIA_TYPE = 415 #
HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE = 416 #
HTTP_417_EXPECTATION_FAILED = 417 #
HTTP_418_IM_A_TEAPOT = 418 #
HTTP_422_UNPROCESSABLE_ENTITY = 422 #
HTTP_423_LOCKED = 423 #
HTTP_424_FAILED_DEPENDENCY = 424 #
HTTP_426_UPGRADE_REQUIRED = 426 #
HTTP_428_PRECONDITION_REQUIRED = 428 #
HTTP_429_TOO_MANY_REQUESTS = 429 # #
HTTP_431_REQUEST_HEADER_FIELDS_TOO_LARGE = 431 #
HTTP_451_UNAVAILABLE_FOR_LEGAL_REASONS = 451 #
HTTP_500_INTERNAL_SERVER_ERROR = 500 # 服务器错误
HTTP_501_NOT_IMPLEMENTED = 501 #
HTTP_502_BAD_GATEWAY = 502 #
HTTP_503_SERVICE_UNAVAILABLE = 503 #
HTTP_504_GATEWAY_TIMEOUT = 504 #
HTTP_505_HTTP_VERSION_NOT_SUPPORTED = 505 #
HTTP_506_VARIANT_ALSO_NEGOTIATES = 506 #
HTTP_507_INSUFFICIENT_STORAGE = 507 #
HTTP_508_LOOP_DETECTED = 508 #
HTTP_509_BANDWIDTH_LIMIT_EXCEEDED = 509 #
HTTP_510_NOT_EXTENDED = 510 #
HTTP_511_NETWORK_AUTHENTICATION_REQUIRED = 511 #
Elasticsearch
- 需要安装 elasticsearch2.4.1并启动
- windows:https://www.cnblogs.com/hualess/p/11540477.html
- 中文分词:elasticsearch-ik
安装
# 注意版本需相对应
pip install drf-haystack==1.8.10
pip install elasticsearch==2.4.1
配置
## settings.py
INSTALLED_APPS =
[
...
'rest_framework'
'haystack',
]
# Haystack
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'haystack.backends.elasticsearch_backend.ElasticsearchSearchEngine',
'URL': 'http://127.0.0.1:9200/', # 此处为elasticsearch运行的服务器ip地址,端口号固定为9200
'INDEX_NAME': 'fresh', # 指定elasticsearch建立的索引库的名称
},
}
# 当添加、修改、删除数据时,自动生成索引
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
# 指定搜索结果每页的条数
# HAYSTACK_SEARCH_RESULTS_PER_PAGE = 1
创建索引类
from django.db import models
class Es(models.Model):
name=models.CharField(max_length=32)
desc=models.CharField(max_length=32)
同目录app下创建search_indexes.py
from haystack import indexes
from apps.product.models import ProductInfo
# 索引模型类的名称必须是 模型类名称 + Index
class ProductInfoIndex(indexes.SearchIndex, indexes.Indexable):
# text表示被查询的字段,用户搜索的是这些字段的值,具体被索引的字段写在另一个文件里
text = indexes.CharField(document=True, use_template=True)
# 保存在索引库中的字段
id = indexes.IntegerField(model_attr='id')
name = indexes.CharField(model_attr='name')
def get_model(self):
return ProductInfo
def index_queryset(self, using=None):
return self.get_model().objects.all()
创建模板文件
- 创建 templates/search/indexes/app/es_text.txt
- 名字要对应
{{ object.name }}
{{ object.desc }}
手动更新索引
- 数据库有多少条数据,全部会被同步到es中
# 首次重建
python manage.py rebuild_index
# 更新
python manage.py update_index
创建haystack序列化器
from drf_haystack.serializers
import HaystackSerializer
from rest_framework.serializers
import ModelSerializer from app
import models
from app.search_indexes import EsIndex
class EsSerializer(ModelSerializer):
class Meta:
model=models.Es
fields='__all__'
class EsIndexSerializer(HaystackSerializer):
object = EsSerializer(read_only=True) # 只读,不可以进行反序列化
class Meta:
index_classes = [EsIndex] # 索引类的名称
fields = ('text', 'object') # text:由索引类返回,固定名称;object:由序列化类返回
类试图调用
from drf_haystack.viewsets
import HaystackViewSet
from app.models import Book
from app.serializers import EsIndexSerializer
class EsSearchView(HaystackViewSet):
index_models = [Es]
serializer_class = EsIndexSerializer
路由
from django.conf.urls
import url from django.contrib
import admin
from rest_framework import routers
from app.views import EsSearchView
router = routers.DefaultRouter()
router.register("book/search", EsSearchView, base_name="book-search")
urlpatterns = [ url(r'^admin/', admin.site.urls), ]
urlpatterns += router.urls
测试
http://127.0.0.1:8000/?text=测试
Celery
pip install django-celery
- proj/celery.py
import os
from celery import Celery
from datetime import datetime, timedelta
# 设置默认的Django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'alarmManagerBackend.settings')
app = Celery('alarmManagerBackend')
# 在这里使用字符串意味着worker不需要序列化
# 子进程的配置对象
# - namespace表示所有芹菜相关的配置键,应该有一个“CELERY_”前缀。
app.config_from_object('django.conf:settings', namespace='CELERY')
# 从所有注册的Django应用中加载任务模块。
app.autodiscover_tasks()
# 定时任务
app.conf.beat_schedule = {
'skywalking_self_recover': {
'task': 'skywalking.tasks.skywalking_self_recover',
# 'schedule': timedelta(minutes=2)
'schedule': 3
},
}
apps/xxx/tasks.py
from celery import shared_task
@shared_task
def task_demo():
print(2)
return 2
启动命令
## windows
celery -A proj worker -l INFO --pool=solo -P eventlet
celery -A proj beat -l info
--logfile=E:\GCloud\alarmmanagerbackend\logs\celery.log
celery -A alarmManagerBackend beat -l info --
# linux启动,不要用root用户
celery multi start -A proj worker -l info --logfile=celery.log --pidfile=celery.pid
celery multi restart -A proj worker -l info --logfile=celery.log --pidfile=celery.pid
celery multi stop -A proj worker -l info --logfile=celery.log --pidfile=celery.pid
# celery进程全杀
ps auxww | grep 'celery' | awk '{print $2}' | xargs kill -9
发邮件
配置文件
## settings.py
# 配置邮件服务器
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.163.com' # 如果是 qq 改成 smtp.qq.com
EMAIL_PORT = 465 # 发邮件端口
EMAIL_HOST_USER = '' # 邮箱
EMAIL_HOST_PASSWORD = '' # 密码授权码
EMAIL_FROM = EMAIL_HOST_USER # 发件人抬头
# EMAIL_USE_TLS = False # 是否使用安全协议传输
EMAIL_USE_SSL = True
安装celery
pip install celery
项目下创建目录 celert_tasks
创建celery异步任务
- celery_tasks/main.py
import os
from celery import Celery
# 把celery和django进行组合,必须让celery能识别和加载django的配置文件以及django的类库
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fresh.settings")
# 对django框架初始化
import django
django.setup()
# 创建Celery实例 生产者
app = Celery('fresh')
# 加载配置
app.config_from_object('celery_tasks.config')
# 注册任务,多任务写多个
app.autodiscover_tasks(['celery_tasks.email'])
celery_tasks/config.py
# Celery配置文件
# 指定中间人、消息队列、任务队列、容器,使用redis
broker_url = 'redis://127.0.0.1/3'
# 结果队列的链接地址
celery_result_backend = 'redis://127.0.0.1:6379/14'
celery_tasks/email/tasks.py
from celery import Task
from django.core.mail import send_mail
from django.conf import settings
from celery_tasks.main import celery_app
# 监听整个任务的钩子,有时任务会失败
class Mytask(Task):
def on_success(self, retval, task_id, args, kwargs):
print('task success!')
return super(Mytask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('task failed')
# 可以记录到程序中或者任务队列中,让celery尝试重新执行
return super(Mytask, self).on_failure(exc, task_id, args, kwargs, einfo)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
print('this is after return')
return super(Mytask, self).after_return(status, retval, task_id, args, kwargs, einfo)
def on_retry(self, exc, task_id, args, kwargs, einfo):
print('this is retry')
return super(Mytask, self).on_retry(exc, task_id, args, kwargs, einfo)
@celery_app.task(name='send_verify_email', base=Mytask) # name:起别名;base:为监听类
def send_verify_email(to_email, verify_url):
"""定义发送邮件的任务"""
# send_mail('标题','普通邮件的正文','发件人','收件人列表','富文本邮件正文')
project_name = 'test'
subject = project_name + '邮箱验证'
# message = '普通文本'
# html_message是发送带html样式信息
html_message = '<p>尊敬的用户您好!</p>' \
'<p>感谢您使用%s。</p>' \
'<p>您的邮箱为:%s 。请点击此链接激活您的邮箱:</p>' \
'<p><a href="%s">%s<a></p>' % (project_name, to_email, verify_url, verify_url)
send_mail(subject, '', settings.EMAIL_FROM, [to_email], html_message=html_message)
视图调用
from celery_tasks.email.tasks import send_verify_email
class EmailView(APIView):
def post(self, request):
logger = logging.getLogger('django')
send_verify_email.delay(email, verify_url)
return JsonResponse({'code': 0, 'message': '成功'})
启动
# windows测试启动
celery -A alarmManagerBackend worker -l INFO --pool=solo
celery -A alarmManagerBackend beat -l info
celery -A alarmManagerBackend worker -l INFO --pool=solo --logfile=E:\GCloud\API\alarmmanagerbackend\logs\celery.log
celery -A alarmManagerBackend beat -l info --logfile=E:\GCloud\API\alarmmanagerbackend\logs\celery.log
celery -A alarmManagerBackend worker --pool=solo -l info -P eventlet
celery -A alarmManagerBackend beat -l info -P eventlet
# linux启动,不要用root用户
celery multi start -A celery_tasks.main worker -l info --logfile=celery.log --pidfile=celery.pid
celery multi restart -A celery_tasks.main worker -l info --logfile=celery.log --pidfile=celery.pid
celery multi stop -A celery_tasks.main worker -l info --logfile=celery.log --pidfile=celery.pid
# celery进程全杀
ps auxww | grep 'celery' | awk '{print $2}' | xargs kill -9
数据库读写分离
新建 db_router.py
class MasterSlaveDBRouter(object):
"""数据库主从读写分离路由"""
def db_for_read(self, model, **hints):
"""读数据库"""
return "slave"
def db_for_write(self, model, **hints):
"""写数据库"""
return "default"
def allow_relation(self, obj1, obj2, **hints):
"""是否运行关联操作"""
return True
配置 setting.py
DATABASE_ROUTERS = ["app.db_router.MasterSlaveDBRouter"]
生成接口文档
- 环境说明
- python >= 3.6.2
- Jinja2 <= 3.0.0
pip install coreapi
pip install djangorestframework
配置
## setting.py
INSTALLED_APPS = [
...
'rest_framework'
]
REST_FRAMEWORK = {
...
'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.AutoSchema',
}
路由
urlpatterns = [
path('docs/', include_docs_urls(title='接口文档')),
]
访问
http://127.0.0.1:8000/docs
单元测试
基本使用
from django.test import TestCase
# 需要测试的函数
def add(a, b):
return a + b
# 普通测试
d = add(1, 3)
assert d == 4 # 验证结果是否相等
class BookTestCase(TestCase):
"""框架测试"""
@classmethod
def setUpClass(cls):
"""所有测试用例之前调用"""
pass
@classmethod
def tearDownClass(cls):
"""所有测试用例之后调用"""
pass
def setUp(self) -> None:
"""前置准备,每个测试方法之前调用"""
pass
def tearDown(self) -> None:
"""后置处理,每个测试方法之后调用"""
pass
def test_1(self): # 测试函数必须以 test 开头
"""测试功能"""
c = add(1, 3)
self.assertEqual(c, 1) # 验证结果是否相等
def test_2(self):
c = add(-1, - 3)
self.assertEqual(c, -4)
# python manage.py test
# python manage.py test 模块名
# python manage.py test 模块名.tests.类名
# python manage.py test 模块名.tests.类名.函数名
模型测试
class UserTestCase(TestCase):
def setUp(self) -> None:
# UserWechat.objects.create(openid='aaa')
pass
def test_user_create(self):
"""测试用户创建"""
User.objects.create(username='aaa', gender='a')
res = User.objects.get(username='aaa')
self.assertEqual(res.username, 'aaa')
def tearDown(self) -> None:
pass
接口测试
class UserAPITestCase(TestCase):
def test_hello(self):
res = self.client.get('/hello')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.json().get('code'), 0)
def test_login(self):
data = {
"openid": "acx",
"nickname": "LP",
"gender": 0
}
res = self.client.post('/login', data=data, content_type='application/json')
self.assertEqual(res.status_code, 200)
Session
import requests
from django.test import TestCase
class UserAPITestCase(TestCase):
@classmethod
def setUpClass(cls):
"""所有测试用例之前调用"""
data = {
"openid": "aaaa",
"nickname": "LP",
"gender": 0
}
cls.s = requests
res = cls.s.post('http://127.0.0.1:8000/fresh/api/user/login', json=data)
print('登录结果', res.json())
pass
@classmethod
def tearDownClass(cls):
"""所有测试用例之后调用"""
pass
def test_logout(self):
"""需要session校验"""
res = self.s.delete('http://127.0.0.1:8000/fresh/api/user/logout')
self.assertEqual(res.status_code, 200)
XAdmin
Admin-simpleui
https://simpleui.72wo.com/docs/simpleui/
Admin
修改admin标题
from django.contrib import admin
admin.site.site_title = '管理后台'
admin.site.site_header = "xx管理后台"
导入导出
pip install django-import-export
## settings.py
INSTALLED_APPS = [
...
'import_export',
...
]
## admin.py
from django.contrib import admin
from import_export import resources
from import_export.admin import ImportExportActionModelAdmin
from .models import Book
# 创建进出口资源
class BookResource(resources.ModelResource):
class Meta:
model = Book
fields = ('id', 'name', 'price') # 导出字段
# exclude = ('imported', ) # 导出排除字段
@admin.register(Book)
class BookAdmin(ImportExportActionModelAdmin):
resource_class = BookResource
list_display = ('id', 'name', 'price', 'create_time')
项目部署
环境
nginx1.20.0
mysql5.7
redis5.0.3
django3.2.0
创建python虚拟环境
conda create -n fresh python==2.6.2
# 安装pip环境
pip install -r requirements.txt
# 安装uwsgi
pip install uwsgi
- pip mysqlclient报错
yum install mysql-devel
- pip uwsgi报错
- 参考:乌斯吉 * Anaconda.org
conda install -c conda-forge/label/gcc7 uwsgi
项目目录下创建 uwsgi.ini 文件
[uwsgi]
# 项目目录
chdir=/root/project/fresh
# 设置日志存储
daemonize=/root/project/fresh/uwsgi.log
# 你项目使用的虚拟环境的根目录 绝对地址
virtualenv = /root/miniconda3/envs/fresh
# 指定socket,真实端口
socket=:8001
# 指定pid文件
pidfile=/root/project/fresh/uwsgi.pid
# 指定项目的wsgi模块
module=fresh.wsgi
# 启用主进程
master=true
# 进程个数
workers=3
# 在每个worker而不是master中加载应用
lazy-apps=true
# 每个进程最大的请求数
max-request = 1000
# 启动uwsgi的用户名和用户组
uid=root
gid=root
# 自动移除unix Socket和pid文件当服务停止的时候
vacuum=true
# 启用线程
enable-threads=true
# 设置自中断时间
harakiri=30
# 设置缓冲
post-buffering=4096
#设置在平滑的重启(直到接收到的请求处理完才重启)一个工作子进程中,等待这个工作结束的最长秒数。这个配置会使在平滑地重启工作子进程中,如果工作进程结束时间超过了8秒就会被强行结束(忽略之前已经接收到的请求而直接结束)
reload-mercy = 8
配置nginx
upstream fresh {
server 127.0.0.1:8001;
}
server {
listen 80;
server_name www.smilelp.top;
location = /ok {
default_type application/json;
return 200 '{"msg":"www.smilelp.top"}';
}
location / {
include uwsgi_params;
uwsgi_pass fresh;
uwsgi_read_timeout 15;
}
location /static {
alias /root/project/fresh/static;
}
}
uwsgi启动 django项目
# 启动
uwsgi --ini uwsgi.ini
# 关闭
uwsgi --stop uwsgi.pid
# 重载nginx
systemcal reload nginx
Docker部署
- docker+nginx+uwsgi+django+python+mysql+redis
https://blog.csdn.net/aafeiyang/article/details/100152373/
Dokerfile
FROM ubuntu:16.04
FROM python:3.6
ENV http_proxy=http://xxx.xxx.xxx.xxx:3128
ENV https_proxy=http://xxx.xxx.xxx.xxx:3128
RUN apt-get -y update && \
apt-get -y upgrade && \
apt-get install -y \
vim \
git \
python3-dev \
python3-setuptools \
python3-pip \
nginx \
supervisor \
nodejs \
npm \
default-libmysqlclient-dev && \
pip3 install --upgrade -i https://pypi.doubanio.com/simple/ pip setuptools && \
rm -rf /var/lib/apt/lists/*
RUN pip3 install -i https://pypi.doubanio.com/simple/ uwsgi
COPY requirements.txt /data/Project/
RUN pip3 install -i https://pypi.doubanio.com/simple/ -r /data/Project/requirements.txt
COPY ./build/nginx/nginx.conf /etc/nginx/nginx.conf
RUN echo "daemon off;" >> /etc/nginx/nginx.conf
COPY ./build/nginx/conf.d /etc/nginx/conf.d
ENV PYTHONIOENCODING=utf-8
EXPOSE 9990 9999
通过docker-compose.yml 将容器运行起来
nginx:
container_name: nginx
image: registry.cn-shenzhen.aliyuncs.com/beni/nginx:latest
ports:
- 80:80
volumes:
- C:/data/django/nginx:/etc/nginx/conf.d
- C:/data/django/www:/data/django/www
links:
- django:django
django:
container_name: django
image: registry.cn-shenzhen.aliyuncs.com/beni/django:latest
volumes:
- C:/data/django/www:/data/django/www
command: uwsgi --ini /data/django/www/mblog/uwsgi.ini