django rest_framework中GenericAPIView配合拓展类mixin或者视图集viewset可以复用其代码,减少自己编写的代码量。下面我要实现自己的视图类,以减少代码量新建一个myView.py
from collections import OrderedDictfrom rest_framework import statusfrom rest_framework.generics import GenericAPIViewfrom rest_framework.pagination import PageNumberPaginationfrom rest_framework.response import Responsefrom rest_framework.settings import api_settingsfrom rest_framework.views import APIViewclass MyView(APIView): queryset = None # 模型数据 serializer_class = None # 序列化类 filter_class = [] # 过滤字段 lookup_field = "id" # 删改的查找字段 ordeing_field = ("id",) # 排序字段 pagination_class = None # 分页器 def get_queryset(self): """ 获取queryset数据列表 """ assert self.queryset is not None, ( ""%s" should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) self.queryset = self.queryset.all() # self.filter_queryset() return self.queryset def filter_queryset(self, queryset): """ 过滤queryset数据 """ search_dict = {} for fc in self.filter_class: field = self.request.query_params.dict().get(fc) if field: search_dict[fc] = field queryset = queryset.filter(**search_dict) return queryset def get_serializer(self, *args, **kwargs): """ 获取序列化模型 """ serializer_class = self.get_serializer_class() kwargs.setdefault("context", self.get_serializer_context()) return serializer_class(*args, **kwargs) def get_serializer_class(self): """ 获取序列化模型类, 判断存在否 """ assert self.serializer_class is not None, ( ""%s" should either include a `serializer_class` attribute, " "or override the `get_serializer_class()` method." % self.__class__.__name__ ) return self.serializer_class def get_serializer_context(self): return { "request": self.request, "format": self.format_kwarg, "view": self } def create(self, request, *args, **kwargs): """ 创建一条数据 """ serializer_class = self.get_serializer_class() serializer = serializer_class(data=request.data) serializer.is_valid(raise_exception=True) serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): """ 更新修改一条数据 """ try: instance = self.queryset.get(id=request.data.get(self.lookup_field)) except Exception: return Response({"state": "fail", "msg": "未找到该数据"}, status=status.HTTP_400_BAD_REQUEST) serializer_class = self.get_serializer_class() serializer = serializer_class(instance=instance, data=request.data) serializer.is_valid(raise_exception=True) serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): """ 删除数据,传入的时数组,表示可以删除多条 """ try: instance = self.queryset.filter(id__in=request.data.get(self.lookup_field)) except Exception: return Response({"state": "fail", "msg": "未找到数据"}, status=status.HTTP_400_BAD_REQUEST) instance.delete() return Response({"state": "success", "msg": "删除成功"}, status=status.HTTP_204_NO_CONTENT) @property def paginator(self): """ 分页器属性 """ if not hasattr(self, "_paginator"): if self.pagination_class is None: self._paginator = None else: self._paginator = self.pagination_class() return self._paginator def get_paginate_queryset(self, queryset): """ 获取分页queryset """ paginator = self.paginator if paginator is None: return None return paginator.paginate_queryset( queryset=queryset, request=self.request, view=self ) def get_paginated_response(self, data): """ 获取分页后的返回 """ assert self.paginator is not None # print(self.paginator.page.paginator.count) return self.paginator.get_paginated_response(data) def order_by_queryset(self, queryset): """ queryset数据进行排序 """ return queryset.order_by(*self.ordeing_field)class MyPagination(PageNumberPagination): page_size = 10 # 表示每页的默认显示数量 max_page_size = 50 # max_page_size:表示每页最大显示数量,做限制使用,避免突然大量的查询数据,数据库崩溃 page_size_query_param = "page_size" # page_size_query_param:表示url中每页数量参数 page_query_param = "page_num" # page_query_param:表示url中的页码参数 def get_paginated_response(self, data): """ 重构分页返回的数据 """ return Response(OrderedDict([ ("total", self.page.paginator.count), ("data", data) ]))class MixinGetPageList: """ get只获取分页数据 """ def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) serializer_class = self.get_serializer_class() queryset = self.get_paginate_queryset(queryset) serializer = serializer_class(queryset, many=True) return self.get_paginated_response(serializer.data)class MixinGetAllList: """ get只获取所有数据 """ def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) serializer_class = self.get_serializer_class() serializer = serializer_class(queryset, many=True) return Response(serializer.data)class MixinGetList: """ get获取分页和所有数据 """ all_serializer_class = None # 获取所有的序列化类 def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) params = request.query_params.dict() if self.pagination_class is not None and params.get("all") is None: serializer_class = self.get_serializer_class() queryset = self.get_paginate_queryset(queryset) serializer = serializer_class(queryset, many=True) return self.get_paginated_response(serializer.data) self.serializer_class = self.all_serializer_class serializer_class = self.get_serializer_class() serializer = serializer_class(queryset, many=True) return Response(serializer.data)class MixinPostCreateModel: """ post增加数据 """ def post(self, request): return self.create(request)class MixinPutUpdateModel: """ put修改数据 """ def put(self, request): return self.update(request)class MixinDeleteDestroyModel: """ delete删除数据 """ def delete(self, request): return self.destroy(request)class MyMixin(MyView): """ 增删改查 """ def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) serializer_class = self.get_serializer_class() if self.pagination_class is not None: queryset = self.get_paginate_queryset(queryset) # print(queryset) serializer = serializer_class(queryset, many=True) return self.get_paginated_response(serializer.data) serializer = serializer_class(queryset, many=True) return Response(serializer.data) def post(self, request): return self.create(request) def put(self, request): return self.update(request) def delete(self, request): return self.destroy(request)
定义的数据库模型,也就是models.py的模型
class Gender(models.Model): class Meta: db_table = "gender" name = models.CharField(verbose_name="名字", max_length=16)
序列化文件serializer.py
【资料图】
class GenderSerializer(serializers.ModelSerializer): class Meta: model = models.Gender fields = "__all__" def create(self, validated_data): res = models.Gender.objects.create(**validated_data) return res def update(self, instance, validated_data): instance.name = validated_data.get("name") instance.save() return instance
然后新建一个视图文件
from ani import modelsfrom ani.serializer import GenderSerializerfrom utils.myView import MyPagination, MyView, MixinGetList, MixinPostCreateModel, MixinPutUpdateModel, \ MixinDeleteDestroyModelclass GenderView(MyView, MixinGetList, MixinPostCreateModel, MixinPutUpdateModel, MixinDeleteDestroyModel): queryset = models.Gender.objects.all() serializer_class = GenderSerializer all_serializer_class = GenderSerializer filter_class = ["name__icontains"] pagination_class = MyPagination lookup_field = "id" ordeing_field = ("-id",)
依次继承get、post、put、delete,实现查、增、改、删。
接下来对请求参数,及返回参数进行加密,加解密可以看我之前的文章
先新建一个MyResponse.py,自定义自己的返回类
import jsonfrom rest_framework.response import Responsefrom utils.encryption import setDataAesclass AESResponse(Response): def __init__(self, data=None, secret="www", status=None, template_name=None, headers=None, exception=False, content_type=None): enaes_data = setDataAes(secret, json.dumps(data)) super(AESResponse, self).__init__(data=enaes_data, status=status, template_name=template_name, headers=headers, exception=exception, content_type=content_type)
将myView.py中的Response都替换为自定义返回类,新建了一个myViewEncryp.py
from collections import OrderedDictfrom rest_framework import statusfrom rest_framework.generics import GenericAPIViewfrom rest_framework.pagination import PageNumberPaginationfrom rest_framework.settings import api_settingsfrom rest_framework.views import APIViewfrom utils.MyResponse import AESResponsefrom utils.tools import get_secretclass MyView(APIView): queryset = None # 模型数据 serializer_class = None # 序列化模型 filter_class = [] # 过滤字段 lookup_field = "id" # 删改的查找字段 ordeing_field = ("id",) # 排序字段 pagination_class = None # 分页器 def get_queryset(self): """ 获取queryset数据列表 """ assert self.queryset is not None, ( ""%s" should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) self.queryset = self.queryset.all() # self.filter_queryset() return self.queryset def filter_queryset(self, queryset): """ 过滤queryset数据 """ search_dict = {} for fc in self.filter_class: field = self.request.query_params.dict().get(fc) if field: search_dict[fc] = field queryset = queryset.filter(**search_dict) return queryset def get_serializer(self, *args, **kwargs): """ 获取序列化模型 """ serializer_class = self.get_serializer_class() kwargs.setdefault("context", self.get_serializer_context()) return serializer_class(*args, **kwargs) def get_serializer_class(self): """ 获取序列化模型类, 判断存在否 """ assert self.serializer_class is not None, ( ""%s" should either include a `serializer_class` attribute, " "or override the `get_serializer_class()` method." % self.__class__.__name__ ) return self.serializer_class def get_serializer_context(self): return { "request": self.request, "format": self.format_kwarg, "view": self } def create(self, request, *args, **kwargs): """ 创建一条数据 """ serializer_class = self.get_serializer_class() serializer = serializer_class(data=request.data) serializer.is_valid(raise_exception=True) serializer.save() secret = get_secret(request) return AESResponse(serializer.data, secret=secret, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): """ 更新修改一条数据 """ secret = get_secret(request) try: instance = self.queryset.get(id=request.data.get(self.lookup_field)) except Exception: return AESResponse({"state": "fail", "msg": "未找到该数据"}, secret=secret, status=status.HTTP_400_BAD_REQUEST) serializer_class = self.get_serializer_class() serializer = serializer_class(instance=instance, data=request.data) serializer.is_valid(raise_exception=True) serializer.save() return AESResponse(serializer.data, secret=secret, status=status.HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): """ 删除数据,传入的时数组,表示可以删除多条 """ secret = get_secret(request) try: instance = self.queryset.filter(id__in=request.data.get(self.lookup_field)) except Exception: return AESResponse({"state": "fail", "msg": "未找到数据"}, secret=secret, status=status.HTTP_400_BAD_REQUEST) instance.delete() return AESResponse({"state": "success", "msg": "删除成功"}, secret=secret, status=status.HTTP_204_NO_CONTENT) @property def paginator(self): """ 分页器属性 """ if not hasattr(self, "_paginator"): if self.pagination_class is None: self._paginator = None else: self._paginator = self.pagination_class() return self._paginator def get_paginate_queryset(self, queryset): """ 获取分页queryset """ paginator = self.paginator if paginator is None: return None return paginator.paginate_queryset( queryset=queryset, request=self.request, view=self ) def get_paginated_response(self, data): """ 获取分页后的返回 """ assert self.paginator is not None # print(self.paginator.page.paginator.count) return self.paginator.get_paginated_response(data) def order_by_queryset(self, queryset): """ queryset数据进行排序 """ return queryset.order_by(*self.ordeing_field)class MyPagination(PageNumberPagination): page_size = 10 # 表示每页的默认显示数量 max_page_size = 50 # max_page_size:表示每页最大显示数量,做限制使用,避免突然大量的查询数据,数据库崩溃 page_size_query_param = "page_size" # page_size_query_param:表示url中每页数量参数 page_query_param = "page_num" # page_query_param:表示url中的页码参数 def get_paginated_response(self, data): """ 重构分页返回的数据 """ secret = get_secret(self.request) return AESResponse(OrderedDict([ ("total", self.page.paginator.count), ("data", data) ]), secret=secret)class MixinGetPageList: """ get只获取分页数据 """ def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) serializer_class = self.get_serializer_class() queryset = self.get_paginate_queryset(queryset) serializer = serializer_class(queryset, many=True) return self.get_paginated_response(serializer.data)class MixinGetAllList: """ get只获取所有数据 """ def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) serializer_class = self.get_serializer_class() serializer = serializer_class(queryset, many=True) secret = get_secret(request) return AESResponse(serializer.data, secret=secret)class MixinGetList: """ get获取分页和所有数据 """ all_serializer_class = None def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) params = request.query_params.dict() if self.pagination_class is not None and params.get("all") is None: serializer_class = self.get_serializer_class() queryset = self.get_paginate_queryset(queryset) serializer = serializer_class(queryset, many=True) return self.get_paginated_response(serializer.data) self.serializer_class = self.all_serializer_class serializer_class = self.get_serializer_class() serializer = serializer_class(queryset, many=True) secret = get_secret(request) return AESResponse(serializer.data, secret=secret)class MixinPostCreateModel: """ post增加数据 """ def post(self, request): return self.create(request)class MixinPutUpdateModel: """ put修改数据 """ def put(self, request): return self.update(request)class MixinDeleteDestroyModel: """ delete删除数据 """ def delete(self, request): return self.destroy(request)class MyMixin(MyView): def get(self, request, *args, **kwargs): queryset = self.get_queryset() queryset = self.filter_queryset(queryset) queryset = self.order_by_queryset(queryset) serializer_class = self.get_serializer_class() if self.pagination_class is not None: queryset = self.get_paginate_queryset(queryset) # print(queryset) serializer = serializer_class(queryset, many=True) return self.get_paginated_response(serializer.data) serializer = serializer_class(queryset, many=True) secret = get_secret(request) return AESResponse(serializer.data, secret=secret) def post(self, request): return self.create(request) def put(self, request): return self.update(request) def delete(self, request): return self.destroy(request)
其中加密的密钥是用户的token,或者写死的字符串,tools.py
def get_secret(request): """ 获取加密的key """ return request.META.get("HTTP_AUTHORIZATION") or "wchime"
如果前端的请求参数加密,那么需要对参数解密,使用装饰器,decorators.py
def request_decrypt(func): """ 解密请求参数 只对data解密,data传入的必须是字典,不然没有update属性 """ def wrap(request, *args, **kwargs): data = request.data # print(data) secret = get_secret(request) decrypt_data = getDataAes(secret, data.get("text")) if decrypt_data: data = json.loads(decrypt_data) del request.data["text"] request.data.update(data) # print(decrypt_data) return func(request, *args, **kwargs) return wrap
这时候视图文件需要装饰器解密
from django.utils.decorators import method_decoratorfrom ani import modelsfrom ani.serializer import GenderSerializerfrom utils.decorators import request_decryptfrom utils.myViewEncryp import MyPagination, MyView, MixinGetList, MixinPostCreateModel, MixinPutUpdateModel, \ MixinDeleteDestroyModel@method_decorator(request_decrypt, name="put")@method_decorator(request_decrypt, name="delete")@method_decorator(request_decrypt, name="post")class GenderView(MyView, MixinGetList, MixinPostCreateModel, MixinPutUpdateModel, MixinDeleteDestroyModel): queryset = models.Gender.objects.all() serializer_class = GenderSerializer all_serializer_class = GenderSerializer filter_class = ["name__icontains"] pagination_class = MyPagination lookup_field = "id" ordeing_field = ("-id",)
项目文件结构
请求提交参数脚本
import jsonimport requestsfrom encryption import setDataAes, getDataAesd = {"name": "aaa"}body = setDataAes("wchime", json.dumps(d))url = "http://127.0.0.1:4000/ani/gender"print(body)data = {"text": body}res = requests.post(url, json=data)print(res.text)print(getDataAes("wchime", res.text))
打印结果