from rest_framework import viewsets from .models import ListModel, TypeListModel from . import serializers from utils.page import MyPageNumberPagination from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend from rest_framework.response import Response from .filter import Filter, TypeFilter from rest_framework.exceptions import APIException from .serializers import FileRenderSerializer from django.http import StreamingHttpResponse from .files import FileRenderCN, FileRenderEN from rest_framework.settings import api_settings from rest_framework import permissions from staff.models import ListModel as staff from userprofile.models import Users from django.utils import timezone from utils.md5 import Md5 import random from django.contrib.auth.models import User from .models import Role, Permission # 新增角色和权限模型导入 class APIViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) partial_update: Partial_update a data(patch:partial_update) update: Update a data(put:update) """ pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = Filter def list(self, request, *args, **kwargs): # staff_name = str(request.GET.get('staff_name')) # check_code = request.GET.get('check_code') # if staff_name == None and check_code == None: # return super().list(request, *args, **kwargs) # elif staff_name != None and check_code == None: # return super().list(request, *args, **kwargs) # else: # staff_name_obj = ListModel.objects.filter(openid=self.request.auth.openid, staff_name=staff_name, # is_delete=False).first() # if staff_name_obj is None: # raise APIException({"detail": "用户名不存在"}) # elif staff_name_obj.is_lock is True: # raise APIException({"detail": "用户已被锁定,请联系管理员"}) # elif staff_name_obj.error_check_code_counter == 3: # staff_name_obj.is_lock = True # staff_name_obj.error_check_code_counter = 0 # staff_name_obj.save() # raise APIException({"detail": "用户已被锁定,请联系管理员"}) # if type(check_code) == str: # check_code = int(check_code) # if check_code != None: # if staff_name_obj.check_code != check_code: # staff_name_obj.error_check_code_counter = int(staff_name_obj.error_check_code_counter) + 1 # staff_name_obj.save() # raise APIException({"detail": "验证码错误"}) # else: # staff_name_obj.error_check_code_counter = 0 # staff_name_obj.save() # return super().list(request, *args, **kwargs) # else: return super().list(request, *args, **kwargs) def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ListModel.objects.filter(is_delete=False) else: return ListModel.objects.filter(id=id, is_delete=False) else: return ListModel.objects.none() def get_serializer_class(self): staff_type = ListModel.objects.filter(openid=self.request.auth.openid, is_delete=False).first().staff_type if staff_type not in ['admin', '主管', '管理员','经理']: if self.action in ['list', 'retrieve', 'destroy']: return serializers.userStaffGetSerializer elif self.action in ['create']: return serializers.userStaffPostSerializer elif self.action in ['update']: return serializers.userStaffUpdateSerializer elif self.action in ['partial_update']: return serializers.userStaffPartialUpdateSerializer else: return self.http_method_not_allowed(request=self.request) else: if self.action in ['list', 'retrieve', 'destroy']: return serializers.StaffGetSerializer elif self.action in ['create']: return serializers.StaffPostSerializer elif self.action in ['update']: return serializers.StaffUpdateSerializer elif self.action in ['partial_update']: return serializers.StaffPartialUpdateSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data data['openid'] = self.request.auth.openid # 检查角色是否存在 role_name = data.get('role') if role_name: role, created = Role.objects.get_or_create(name=role_name) data['role'] = role.id if ListModel.objects.filter(openid=data['openid'], staff_name=data['staff_name'], is_delete=False).exists(): raise APIException({"detail": "Data exists"}) else: app_code = Md5.md5(data['staff_name'] + '1') data['appid'] = app_code check_code = random.randint(1000, 9999) data['check_code'] = check_code # 创建用户 user = User.objects.create_user( username=str(data['staff_name']), password=str(check_code) ) ip = request.META.get('HTTP_X_FORWARDED_FOR') if request.META.get( 'HTTP_X_FORWARDED_FOR') else request.META.get('REMOTE_ADDR') Users.objects.create(user_id=user.id, name=str(data['name']), openid=app_code, appid=app_code, t_code=Md5.md5(str(timezone.now())), developer=1, ip=ip) serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) def update(self, request, pk): qs = self.get_object() # if qs.openid != self.request.auth.openid: # creator = ListModel.objects.filter(openid=self.request.auth.openid, is_delete=False) # raise APIException({"detail": "该用户不是您创建的,不能修改"}) # else: data = self.request.data # 更新角色 role_name = data.get('role') if role_name: role, created = Role.objects.get_or_create(name=role_name) data['role'] = role.id serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) def partial_update(self, request, pk): qs = self.get_object() if qs.openid != self.request.auth.openid: raise APIException({"detail": "Cannot Update Data Which Not Yours"}) else: data = self.request.data # 更新角色 role_name = data.get('role') if role_name: role, created = Role.objects.get_or_create(name=role_name) data['role'] = role.id serializer = self.get_serializer(qs, data=data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) def destroy(self, request, pk): qs = self.get_object() if qs.openid != self.request.auth.openid: raise APIException({"detail": "Cannot Delete Data Which Not Yours"}) else: qs.is_delete = True qs.save() serializer = self.get_serializer(qs, many=False) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) class RoleViewSet(viewsets.ModelViewSet): """角色管理API""" queryset = Role.objects.all() serializer_class = serializers.RoleSerializer pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter] ordering_fields = ['id', "name"] def get_queryset(self): return Role.objects.all() class PermissionViewSet(viewsets.ModelViewSet): """权限管理API""" queryset = Permission.objects.all() serializer_class = serializers.PermissionSerializer # pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter] ordering_fields = ['id', "page"] def get_queryset(self): role = self.request.query_params.get('role') if role: return Permission.objects.filter(role__name=role) return Permission.objects.all() class RolePermissionViewSet(viewsets.ViewSet): """角色权限配置API""" def list(self, request): """获取所有角色类型""" roles = Role.objects.values_list('name', flat=True).distinct() return Response(list(roles)) def retrieve(self, request, pk=None): """获取特定角色的权限配置""" try: role = Role.objects.get(name=pk) serializer = serializers.RoleGETSerializer(role) return Response(serializer.data) except Role.DoesNotExist: return Response({"error": "Role not found"}, status=404) def update(self, request, pk=None): """更新角色权限""" try: role = Role.objects.get(name=pk) permissions_data = request.data.get('permissions', []) # 清除现有权限 role.permissions.clear() # 添加新权限 for perm_data in permissions_data: perm, created = Permission.objects.get_or_create( page=perm_data['page'], component=perm_data.get('component'), defaults={'enabled': perm_data['enabled']} ) role.permissions.add(perm) return Response({"message": "Permissions updated successfully"}) except Role.DoesNotExist: return Response({"error": "Role not found"}, status=404) class RolePagePermissionViewSet(viewsets.ViewSet): """角色权限配置API""" def list(self, request): """获取所有角色类型""" roles = Role.objects.values_list('name', flat=True).distinct() return Response(list(roles)) def retrieve(self, request, pk=None): """获取特定角色的权限配置""" try: role = Role.objects.get(name=pk) serializer = serializers.RolePageGETSerializer(role) return Response(serializer.data) except Role.DoesNotExist: return Response({"error": "Role not found"}, status=404) def get_page_permissions(self, request, pk=None): """获取特定角色的页面访问权限配置""" try: role = Role.objects.get(name=pk) primary_page = request.data.get('primary_page') if primary_page: fliterpermissions= role.permissions.filter(primary_page=primary_page) serializer = self.get_permissions_group(fliterpermissions) return Response(serializer) serializer = self.get_permissions_group(role.permissions.all()) return Response(serializer) except Role.DoesNotExist: return Response({"error": "Role not found"}, status=404) def get_permissions_group(self, permissions): # 获取角色关联的所有权限并预取数据 # 按page字段分组,只处理component为null的权限 page_access = {} for perm in permissions: # 只处理页面访问权限(component为null) if perm.component is None: page_access[perm.page] = perm.enabled # 转换为前端需要的格式 return [{"page": page, "enabled": enabled} for page, enabled in page_access.items()] class RolePageComponentPermissionViewSet(viewsets.ViewSet): """角色权限配置API""" def get_page_component_permissions(self, request, pk=None): """获取特定角色的页面访问权限配置""" try: role = Role.objects.get(name=pk) page = request.data.get('page') if page: fliterpermissions= role.permissions.filter(page=page) serializer = self.get_permissions_group(fliterpermissions) return Response(serializer) serializer = self.get_permissions_group(role.permissions.all()) return Response(serializer) except Role.DoesNotExist: return Response({"error": "Role not found"}, status=404) def get_permissions_group(self, permissions): # 获取角色关联的所有权限并预取数据 page_access = {} for perm in permissions: if perm.component is not None: page_access[perm.component] = perm.enabled # 转换为前端需要的格式 return [{"component": component, "enabled": enabled} for component, enabled in page_access.items()] class TypeAPIViewSet(viewsets.ModelViewSet): """ list: Response a data list(all) """ pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = TypeFilter def get_queryset(self): if self.request.user: return TypeListModel.objects.filter(openid='init_data') else: return TypeListModel.objects.none() def get_serializer_class(self): if self.action in ['list']: return serializers.StaffTypeGetSerializer else: return self.http_method_not_allowed(request=self.request) class FileDownloadView(viewsets.ModelViewSet): renderer_classes = (FileRenderCN,) + tuple(api_settings.DEFAULT_RENDERER_CLASSES) filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = Filter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ListModel.objects.filter(openid=self.request.auth.openid, is_delete=False) else: return ListModel.objects.filter(openid=self.request.auth.openid, id=id, is_delete=False) else: return ListModel.objects.none() def get_serializer_class(self): if self.action in ['list']: return serializers.FileRenderSerializer else: return self.http_method_not_allowed(request=self.request) def get_lang(self, data): lang = self.request.META.get('HTTP_LANGUAGE') if lang: if lang == 'zh-hans': return FileRenderCN().render(data) else: return FileRenderEN().render(data) else: return FileRenderEN().render(data) def list(self, request, *args, **kwargs): from datetime import datetime dt = datetime.now() data = ( FileRenderSerializer(instance).data for instance in self.filter_queryset(self.get_queryset()) ) renderer = self.get_lang(data) response = StreamingHttpResponse( renderer, content_type="text/csv" ) response['Content-Disposition'] = "attachment; filename='staff_{}.csv'".format( str(dt.strftime('%Y%m%d%H%M%S%f'))) return response