from rest_framework import viewsets from utils.page import MyPageNumberPagination from utils.datasolve import sumOfList, transportation_calculate from utils.md5 import Md5 from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend from rest_framework.response import Response from rest_framework.exceptions import APIException from django.utils import timezone from django.db import transaction import logging from rest_framework import status from .models import DeviceModel,LocationModel,LocationGroupModel,LocationContainerLink,LocationChangeLog,ContainerDetailModel from bound.models import BoundBatchModel,BoundDetailModel,BoundListModel from .filter import DeviceFilter,LocationFilter,LocationContainerLinkFilter,LocationChangeLogFilter,LocationGroupFilter from .serializers import LocationListSerializer,LocationPostSerializer from .serializers import LocationGroupListSerializer,LocationGroupPostSerializer # 以后添加模块时,只需要在这里添加即可 from rest_framework.permissions import AllowAny from container.models import ContainerListModel,ContainerDetailModel,ContainerOperationModel,TaskModel, logger = logging.getLogger(__name__) # 库位分配 class LocationAllocation: @transaction.atomic def get_pallet_count_by_batch(self, container_code): """ 根据托盘码查询批次下托盘总数 :param container_code: 要查询的托盘码 :return: 所属批次下的托盘总数 """ # 1. 通过托盘码获取容器详情 container_detail = ContainerDetailModel.objects.filter( container_code=container_code ).select_related('bound_detail').first() if not container_detail: logger.error(f"托盘 {pallet_code} 不存在") # 2. 获取关联的批次明细 bound_detail = container_detail.bound_detail if not bound_detail: logger.error(f"容器 {container_detail.container_code} 不存在") # 3. 获取父级批次 batch = BoundBatchModel.objects.filter( batch_code=bound_detail.batch_code ).prefetch_related('bound_details').first() if not batch: logger.error(f"批次 {bound_detail.batch_code} 不存在") # 4. 统计批次下所有托盘数量(两种方式) # 方式一:直接统计关联明细数量(如果每个detail对应一个托盘) total_pallets = batch.bound_details.count() # 方式二:累加明细中的容器数量(如果detail包含多个容器) # total_pallets = sum(detail.container_count for detail in batch.bound_details.all()) return { "batch_code": batch.batch_code, "total_pallets": total_pallets, "current_pallet": pallet_code } class locationViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = LocationFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return LocationModel.objects.filter() else: return LocationModel.objects.filter( id=id) else: return LocationModel.objects.none() def get_serializer_class(self): if self.action == 'list': return LocationListSerializer elif self.action == 'update': return LocationPostSerializer elif self.action =='retrieve': return LocationListSerializer def update(self, request, *args, **kwargs): data = self.request.data order_month = str(timezone.now().strftime('%Y%m')) data['month'] = order_month location_code = data.get('location_code') # 处理库位对象 location_obj = LocationModel.objects.filter(location_code=location_code).first() if location_obj: data['id'] = location_obj.id logger.info(f"库位 {location_code} 已存在") else: logger.info(f"库位 {location_code} 不存在,创建库位对象") serializer_list = LocationPostSerializer(data=data) serializer_list.is_valid(raise_exception=True) serializer_list.save() data['id'] = serializer_list.data.get('id') return Response(data, status=status.HTTP_201_CREATED) class locationGroupViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = LocationGroupFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return LocationGroupModel.objects.filter() else: return LocationGroupModel.objects.filter(id=id) else: return LocationGroupModel.objects.none() def get_serializer_class(self): if self.action == 'list': return LocationGroupListSerializer elif self.action == 'update': return LocationGroupPostSerializer elif self.action =='retrieve': return LocationGroupListSerializer def update(self, request, *args, **kwargs): data = self.request.data order_month = str(timezone.now().strftime('%Y%m')) data['month'] = order_month group_code = data.get('group_code') # 处理库位组对象 group_obj = LocationGroupModel.objects.filter(group_code=group_code).first() if group_obj: data['id'] = group_obj.id logger.info(f"库位组 {group_code} 已存在") else: logger.info(f"库位组 {group_code} 不存在,创建库位组对象") serializer_list = LocationGroupPostSerializer(data=data) serializer_list.is_valid(raise_exception=True) serializer_list.save() data['id'] = serializer_list.data.get('id') return Response(data, status=status.HTTP_201_CREATED)