from rest_framework.viewsets import ViewSet from rest_framework import viewsets from utils.page import MyPageNumberPagination from django.db.models import Prefetch from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend from rest_framework.response import Response from django.db.models import F, Case, When from django.db.models import OuterRef, Subquery from django.utils import timezone import requests from django.db import transaction import logging from rest_framework import status from .models import ContainerListModel,ContainerDetailModel,ContainerOperationModel,ContainerWCSModel,TaskModel,out_batch_detail,ContainerDetailLogModel,batchLogModel,WCSTaskLogModel from bound.models import BoundDetailModel,BoundListModel,OutBoundDetailModel from bin.views import LocationAllocation,base_location from bin.models import LocationModel,LocationContainerLink,LocationGroupModel from bound.models import BoundBatchModel,OutBatchModel,BatchOperateLogModel from django.conf import settings from rest_framework.views import APIView from rest_framework.response import Response as DRFResponse import os import re from .serializers import ContainerDetailGetSerializer,ContainerDetailPostSerializer,ContainerDetailSimpleGetSerializer,ContainerDetailPutSerializer from .serializers import ContainerListGetSerializer,ContainerListPostSerializer from .serializers import ContainerOperationGetSerializer,ContainerOperationPostSerializer from .serializers import TaskGetSerializer,TaskPostSerializer from .serializers import WCSTaskGetSerializer,WCSTaskLogSerializer from .serializers import OutBoundFullDetailSerializer,OutBoundDetailSerializer from .serializers import ContainerDetailLogSerializer from .serializers import batchLogModelSerializer from .filter import ContainerDetailFilter,ContainerListFilter,ContainerOperationFilter,TaskFilter,WCSTaskFilter,ContainerDetailLogFilter,batchLogFilter,WCSTaskLogFilter from rest_framework.permissions import AllowAny import threading from django.db import close_old_connections from bin.services import AllocationService from collections import defaultdict from django.db.models import Sum from staff.models import ListModel as StaffListModel from operation_log.views import log_success_operation, log_failure_operation, log_operation logger = logging.getLogger(__name__) loggertask = logging.getLogger('wms.WCSTask') from .models import DispatchConfig from .serializers import DispatchConfigSerializer DEFAULT_LOCATION_GROUP_ID = 0 DEFAULT_ORDER_NUMBER = 0 LOCATION_CODE_REGEX = re.compile(r'([A-Z0-9]+-L\d+C\d{3}-\d{2})', re.IGNORECASE) GROUP_CODE_REGEX = re.compile(r'([A-Z0-9]+-L\d+C\d{3})', re.IGNORECASE) COORDINATE_SUFFIX_REGEX = re.compile(r'(\d+)-(\d+)-(\d+)$') def select_reference_location(task_type, current_location, target_location): """ 根据任务类型确定需要解析的库位:入库/移库看目标,其余看当前 """ normalized = str(task_type or '').lower() if normalized in ('inbound', 'move', 'putaway'): return target_location or current_location return current_location or target_location def _find_location_instance(location_str): if not location_str: return None normalized = str(location_str).strip() if not normalized: return None candidates = {normalized, normalized.upper()} regex_match = LOCATION_CODE_REGEX.search(normalized) if regex_match: candidates.add(regex_match.group(1).upper()) for candidate in candidates: location = LocationModel.objects.filter( location_code=candidate ).only('id', 'location_group', 'c_number').first() if location: return location coordinate_match = COORDINATE_SUFFIX_REGEX.search(normalized) if coordinate_match: row, col, layer = coordinate_match.groups() try: return LocationModel.objects.filter( row=int(row), col=int(col), layer=int(layer) ).only('id', 'location_group', 'c_number').first() except ValueError: return None return None def _extract_group_code(location_str): if not location_str: return None match = GROUP_CODE_REGEX.search(location_str) if match: return match.group(1).upper() parts = str(location_str).split('-') if len(parts) > 1 and parts[-1].isdigit(): candidate = '-'.join(parts[:-1]) if GROUP_CODE_REGEX.match(candidate): return candidate.upper() return None def resolve_location_group_metadata(location_value): defaults = { 'location_group_id': DEFAULT_LOCATION_GROUP_ID, 'order_number': DEFAULT_ORDER_NUMBER, } if location_value is None: return defaults.copy() location_str = str(location_value).strip() if not location_str: return defaults.copy() location_instance = _find_location_instance(location_str) if location_instance: group_code = location_instance.location_group group = LocationGroupModel.objects.filter( group_code=group_code ).only('id').first() return { 'location_group_id': group.id if group else defaults['location_group_id'], 'order_number': ( location_instance.c_number ) or defaults['order_number'], } group_code = _extract_group_code(location_str) if group_code: group = LocationGroupModel.objects.filter( group_code=group_code ).only('id').first() if group: return { 'location_group_id': group.id, 'order_number': defaults['order_number'], } return defaults.copy() def get_task_location_metadata(task_type, current_location, target_location): reference_location = select_reference_location(task_type, current_location, target_location) return resolve_location_group_metadata(reference_location) # 托盘分类视图 # 借助LocationContainerLink,其中 # 库位-托盘关联表(记录实时存放关系) # class LocationContainerLink(models.Model): # location = models.ForeignKey( # LocationModel, # on_delete=models.CASCADE, # related_name='container_links', # verbose_name='库位' # ) # container = models.ForeignKey( # ContainerListModel, # on_delete=models.CASCADE, # related_name='location_links', # verbose_name='关联托盘' # ) # task_wcs = models.ForeignKey(ContainerWCSModel, on_delete=models.CASCADE, null=True, blank=True) # task_detail = models.ForeignKey(TaskModel, on_delete=models.CASCADE, null=True, blank=True) # put_time = models.DateTimeField(auto_now_add=True, verbose_name='上架时间') # operator = models.CharField(max_length=50, verbose_name='操作人') # is_active = models.BooleanField(default=True, verbose_name='是否有效') # 借助LocationContainerLink中的is_active字段,可以查询托盘是否在库位中, # 若is_active为True,则托盘在库位中,否则托盘不在库位中。同时,再增加字段来显示在库托盘中的存储的物料信息。 # class containerclassViewSet(viewsets.ModelViewSet): # 托盘流水汇总批次流水 class batchLogModelViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = batchLogFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return batchLogModel.objects.filter() else: return batchLogModel.objects.filter(id=id) else: return batchLogModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return batchLogModelSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data return Response(data, status=200) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) def get_container_operation_log(self,request): batchlog_id = self.request.query_params.get('batchlog_id') batch_obj = batchLogModel.objects.get(id=batchlog_id) container_operation_log = batch_obj.detail_logs.all() serializer = ContainerDetailLogSerializer(container_operation_log, many=True) return Response(serializer.data, status=200) # 进出库log查看 class ContainerDetailLogModelViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = ContainerDetailLogFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerDetailLogModel.objects.filter() else: return ContainerDetailLogModel.objects.filter(id=id) else: return ContainerDetailLogModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerDetailLogSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data return Response(data, status=200) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) # 托盘列表视图 class ContainerListViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = ContainerListFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerListModel.objects.filter() else: return ContainerListModel.objects.filter(id=id) else: return ContainerListModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerListGetSerializer elif self.action in ['create', 'update']: return ContainerListPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): # 创建托盘:托盘码五位数字(唯一),当前库位,目标库位,状态,最后操作时间 container_all = ContainerListModel.objects.all().order_by('container_code') if container_all.count() == 0: container_code = 12345 else: container_code = container_all.last().container_code + 1 container_obj = ContainerListModel.objects.create( container_code=container_code, current_location='N/A', target_location='N/A', status=0, last_operation=timezone.now() ) serializer = ContainerListGetSerializer(container_obj) headers = self.get_success_headers(serializer.data) try: log_success_operation( request=self.request, operation_content=f"创建托盘列表:{container_code}", operation_level="new", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=container_obj.id, module_name="托盘管理" ) except Exception as e: pass return Response(serializer.data, status=201, headers=headers) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) try: log_success_operation( request=self.request, operation_content=f"更新托盘列表 ID:{pk}", operation_level="update", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=pk, module_name="托盘管理" ) except Exception as e: pass return Response(serializer.data, status=200, headers=headers) def check_container_postion(self, request, *args, **kwargs): # 获取查询集 container_list = ContainerListModel.objects.exclude(current_location=F('target_location')) # 手动应用分页 page = self.paginate_queryset(container_list) if page is not None: serializer = ContainerListGetSerializer(page, many=True) return self.get_paginated_response(serializer.data) # 如果没有分页,返回完整结果(不推荐) serializer = ContainerListGetSerializer(container_list, many=True) return Response(serializer.data, status=200) def update_container_categories(self, request, *args, **kwargs): from .utils import update_container_categories_task update_container_categories_task() return Response({'message': '托盘分类更新任务已触发'}, status=200) # wcs任务视图 class WCSTaskViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['-id', "-create_time", "update_time", ] filter_class = WCSTaskFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: base_qs = ContainerWCSModel.objects.select_related( 'batch', 'batch_out', 'batch_out__batch_number', 'batch_out__bound_list', 'bound_list' ) if id is None: return base_qs else: return base_qs.filter(id=id) else: return ContainerWCSModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return WCSTaskGetSerializer else: return self.http_method_not_allowed(request=self.request) def send_task_to_wcs(self, request, *args, **kwargs): data = self.request.data task_id = data.get('taskid') logger.info(f"请求任务:{task_id}") data_return = {} try: task_obj = ContainerWCSModel.objects.get(id=task_id) if not task_obj: data_return = { 'code': '400', 'message': '任务不存在', 'data': data } log_failure_operation( request=self.request, operation_content=f"任务不存在:{task_id}", operation_level="other", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, module_name="WCS任务" ) return Response(data_return, status=status.HTTP_400_BAD_REQUEST) if task_obj.working == 1: OutboundService.send_task_to_wcs(task_obj) data_return = { 'code': '200', 'message': '任务已在执行中,再次下发', 'data': data } else: data_return = { 'code': '200', 'message': '任务已执行完成', 'data': data } except ContainerWCSModel.DoesNotExist: data_return = { 'code': '404', 'message': '任务不存在', 'data': data } log_failure_operation( request=self.request, operation_content=f"任务下发失败:{task_id}", operation_level="other", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, module_name="WCS任务" ) log_success_operation( request=self.request, operation_content=f"任务下发成功:{task_id}", operation_level="other", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, module_name="WCS任务" ) return Response(data_return, status=status.HTTP_200_OK) # 入库任务视图 class TaskViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = TaskFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return TaskModel.objects.filter() else: return TaskModel.objects.filter(id=id) else: return TaskModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return TaskGetSerializer elif self.action in ['create', 'update']: return TaskPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data return Response(data, status=200) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) # 任务回滚 class TaskRollbackMixin: def rollback_task(self, request, task_id, *args, **kwargs): """ 撤销入库任务并回滚相关状态 """ try: # 获取任务实例并锁定数据库记录 task = ContainerWCSModel.objects.get(taskid=task_id) container_code = task.container target_location = task.target_location batch = task.batch # 初始化库位分配器 allocator = LocationAllocation() # ==================== 库位状态回滚 ==================== # 解析目标库位信息(格式:仓库代码-行-列-层) try: warehouse_code, row, col, layer = target_location.split('-') location = LocationModel.objects.get( warehouse_code=warehouse_code, row=int(row), col=int(col), layer=int(layer) ) # 回滚库位状态到可用状态 allocator.update_location_status(location.location_code, 'available') # 更新库位组状态(需要根据实际逻辑实现) allocator.update_location_group_status(location.location_code) # 解除库位与托盘的关联 allocator.update_location_container_link(location.location_code, None) # 清除库位组的批次关联 allocator.update_location_group_batch(location, None) except (ValueError, LocationModel.DoesNotExist) as e: logger.error(f"库位解析失败: {str(e)}") raise Exception("关联库位信息无效") # ==================== 批次状态回滚 ==================== if batch: # 将批次状态恢复为未处理状态(假设原状态为1) allocator.update_batch_status(batch.bound_number, '1') # ==================== 托盘状态回滚 ==================== container_obj = ContainerListModel.objects.get(container_code=container_code) # 恢复托盘详细状态为初始状态(假设原状态为1) allocator.update_container_detail_status(container_code, 1) # 恢复托盘的目标位置为当前所在位置 container_obj.target_location = task.current_location container_obj.save() # ==================== 删除任务记录 ==================== task.delete() # ==================== 其他关联清理 ==================== # 如果有其他关联数据(如inport_update_task的操作),在此处添加清理逻辑 return Response( {'code': '200', 'message': '任务回滚成功', 'data': None}, status=status.HTTP_200_OK ) except ContainerWCSModel.DoesNotExist: logger.warning(f"任务不存在: {task_id}") return Response( {'code': '404', 'message': '任务不存在', 'data': None}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: logger.error(f"任务回滚失败: {str(e)}", exc_info=True) return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) # 入库任务下发 class ContainerWCSViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ authentication_classes = [] # 禁用所有认证类 permission_classes = [AllowAny] # 允许任意访问 def generate_move_task(self, request, *args, **kwargs): data = self.request.data container = data.get('container_code') start_location = data.get('start_location') target_location = data.get('target_location') logger.info(f"移库请求托盘:{container},起始位置:{start_location},目标位置:{target_location}") data_return = {} try: container_obj = ContainerListModel.objects.filter(container_code=container).first() if not container_obj: data_return = { 'code': '400', 'message': '托盘编码不存在', 'data': data } # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"移库任务创建失败:托盘编码不存在 - {container}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS移库管理" ) except Exception as log_error: pass return Response(data_return, status=status.HTTP_400_BAD_REQUEST) # 检查是否已在目标位置 if target_location == str(container_obj.target_location) and target_location!= '203' and target_location!= '103': logger.info(f"托盘 {container} 已在目标位置") data_return = { 'code': '200', 'message': '当前位置已是目标位置', 'data': data } else: # 生成任务 current_task = ContainerWCSModel.objects.filter( container=container, tasktype='inbound', working = 1, ).exclude(status=300).first() if current_task: data_return = { 'code': '200', 'message': '任务已存在,重新下发', 'data': current_task.to_dict() } else: # todo: 这里的入库操作记录里面的记录的数量不对 location_min_value,allocation_target_location, batch_info = AllocationService._move_allocation(start_location, target_location,container) batch_id = batch_info['number'] if batch_info['class'] == 2: self.generate_task_no_batch(container, start_location, allocation_target_location,batch_id,location_min_value.c_number) self.generate_container_operate_no_batch(container_obj, batch_id, allocation_target_location) elif batch_info['class'] == 3: self.generate_task_no_batch(container, start_location, allocation_target_location,batch_id,location_min_value.c_number) self.generate_move_container_operate(container_obj, allocation_target_location) else: self.generate_task(container, start_location, allocation_target_location,batch_id,location_min_value.c_number) # 生成任务 self.generate_move_container_operate(container_obj, allocation_target_location) current_task = ContainerWCSModel.objects.get( container=container, tasktype='inbound', working=1, ) OutboundService.send_task_to_wcs(current_task) data_return = { 'code': '200', 'message': '任务下发成功', 'data': current_task.to_dict() } container_obj.target_location = allocation_target_location container_obj.save() if batch_info['class'] == 1 or batch_info['class'] == 3: self.inport_update_task(current_task.id, container_obj.id) http_status = status.HTTP_200_OK if data_return['code'] == '200' else status.HTTP_400_BAD_REQUEST loggertask.info(f"任务号:{current_task.tasknumber-20000000000},移库请求托盘:{container},起始位置:{start_location},目标位置:{target_location},返回结果:{data_return}") # 记录操作日志 try: if data_return['code'] == '200': task_info = data_return.get('data', {}) task_id = task_info.get('tasknumber', '未知') if isinstance(task_info, dict) else '未知' log_success_operation( request=request, operation_content=f"移库任务创建成功:托盘 {container},起始位置:{start_location},目标位置:{target_location},任务号:{task_id}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", object_id=container_obj.id if container_obj else None, module_name="WCS移库管理" ) else: log_failure_operation( request=request, operation_content=f"移库任务创建失败:托盘 {container},错误:{data_return.get('message', '未知错误')}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS移库管理" ) except Exception as log_error: pass return Response(data_return, status=http_status) except Exception as e: logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True) # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"移库任务处理异常:托盘 {container},错误:{str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS移库管理" ) except Exception as log_error: pass return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def generate_out_task(self, request, *args, **kwargs): data = self.request.data container = data.get('container_code') start_location = data.get('current_location') target_location = data.get('target_location') logger.info(f"出库请求托盘:{container},起始位置:{start_location},目标位置:{target_location}") data_return = {} try: container_obj = ContainerListModel.objects.filter(container_code=container).first() if not container_obj: data_return = { 'code': '400', 'message': '托盘编码不存在', 'data': data } # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"出库任务创建失败:托盘编码不存在 - {container}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response(data_return, status=status.HTTP_400_BAD_REQUEST) # 检查是否已在目标位置 if target_location == str(container_obj.target_location) : logger.info(f"托盘 {container} 已在目标位置") data_return = { 'code': '200', 'message': '当前位置已是目标位置', 'data': data } else: # 生成任务 current_task = ContainerWCSModel.objects.filter( container=container, tasktype='outbound', working = 1, ).exclude(status=300).first() if current_task: data_return = { 'code': '200', 'message': '任务已存在,重新下发', 'data': current_task.to_dict() } else: # todo: 这里的入库操作记录里面的记录的数量不对 allocation_target_location, batch_info = AllocationService._out_allocation(start_location, target_location,container) batch_id = batch_info['number'] if batch_info['class'] == 2: self.generate_task_no_batch(container, start_location, allocation_target_location,batch_id,1,'outbound') self.generate_container_operate_no_batch(container_obj, batch_id, allocation_target_location,"outbound") elif batch_info['class'] == 3: self.generate_task_no_batch(container, start_location, allocation_target_location,batch_id,1,'outbound') self.generate_move_container_operate(container_obj, allocation_target_location,"outbound") else: self.generate_task(container, start_location, allocation_target_location,batch_id,1,'outbound') # 生成任务 self.generate_move_container_operate(container_obj, allocation_target_location,"outbound") current_task = ContainerWCSModel.objects.get( container=container, tasktype='outbound', working=1, ) OutboundService.send_task_to_wcs(current_task) data_return = { 'code': '200', 'message': '任务下发成功', 'data': current_task.to_dict() } container_obj.target_location = allocation_target_location container_obj.save() if batch_info['class'] == 1 or batch_info['class'] == 3: self.inport_update_task(current_task.id, container_obj.id) http_status = status.HTTP_200_OK if data_return['code'] == '200' else status.HTTP_400_BAD_REQUEST loggertask.info(f"任务号:{current_task.tasknumber-20000000000},出库请求托盘:{container}返回结果:{data_return}") # 记录操作日志 try: if data_return['code'] == '200': task_info = data_return.get('data', {}) task_id = task_info.get('tasknumber', '未知') if isinstance(task_info, dict) else '未知' log_success_operation( request=request, operation_content=f"出库任务创建成功:托盘 {container},起始位置:{start_location},目标位置:{target_location},任务号:{task_id}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", object_id=container_obj.id if container_obj else None, module_name="WCS出库管理" ) else: log_failure_operation( request=request, operation_content=f"出库任务创建失败:托盘 {container},错误:{data_return.get('message', '未知错误')}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response(data_return, status=http_status) except Exception as e: logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True) # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"出库任务处理异常:托盘 {container},错误:{str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def get_container_wcs(self, request, *args, **kwargs): data = self.request.data container = data.get('container_number') current_location = data.get('current_location') logger.info(f"入库请求托盘:{container},请求位置:{current_location}") data_return = {} try: container_obj = ContainerListModel.objects.filter(container_code=container).first() if not container_obj: data_return = { 'code': '400', 'message': '托盘编码不存在', 'data': data } # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"入库请求失败:托盘编码不存在 - {container}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS入库管理" ) except Exception as log_error: pass return Response(data_return, status=status.HTTP_400_BAD_REQUEST) # 更新托盘数据(部分更新) serializer = ContainerListPostSerializer( container_obj, data=data, partial=True # 允许部分字段更新 ) serializer.is_valid(raise_exception=True) serializer.save() # 检查是否已在目标位置 if current_location == str(container_obj.target_location) and current_location!= '203' and current_location!= '103': logger.info(f"托盘 {container} 已在目标位置") data_return = { 'code': '200', 'message': '当前位置已是目标位置', 'data': data } else: current_task = ContainerWCSModel.objects.filter( container=container, tasktype='inbound', working = 1, ).exclude(status=300).first() if current_task: data_return = { 'code': '200', 'message': '任务已存在,重新下发', 'data': current_task.to_dict() } else: # todo: 这里的入库操作记录里面的记录的数量不对 location_min_value,allocation_target_location, batch_info = AllocationService.allocate(container, current_location) batch_id = batch_info['number'] if batch_info['class'] == 2: self.generate_task_no_batch(container, current_location, allocation_target_location,batch_id,location_min_value.c_number) self.generate_container_operate_no_batch(container_obj, batch_id, allocation_target_location) elif batch_info['class'] == 3: self.generate_task_no_batch(container, current_location, allocation_target_location,batch_id,location_min_value.c_number) self.generate_container_operate(container_obj, allocation_target_location) else: self.generate_task(container, current_location, allocation_target_location,batch_id,location_min_value.c_number) # 生成任务 self.generate_container_operate(container_obj, allocation_target_location) current_task = ContainerWCSModel.objects.get( container=container, tasktype='inbound', working=1, ) data_return = { 'code': '200', 'message': '任务下发成功', 'data': current_task.to_dict() } container_obj.target_location = allocation_target_location container_obj.save() if batch_info['class'] == 1 or batch_info['class'] == 3: self.inport_update_task(current_task.id, container_obj.id) http_status = status.HTTP_200_OK if data_return['code'] == '200' else status.HTTP_400_BAD_REQUEST # 尝试从返回数据中获取任务号用于日志 try: task_info = data_return.get('data', {}) if isinstance(task_info, dict) and 'taskNumber' in task_info: task_number = task_info['taskNumber'] - 20000000000 if isinstance(task_info['taskNumber'], int) else '未知' loggertask.info(f"任务号:{task_number},入库请求托盘:{container},起始位置:{current_location},返回结果:{data_return}") except Exception: loggertask.info(f"入库请求托盘:{container},起始位置:{current_location},返回结果:{data_return}") # 记录操作日志 try: if data_return['code'] == '200': task_info = data_return.get('data', {}) task_id = task_info.get('taskNumber', '未知') if isinstance(task_info, dict) else '未知' log_success_operation( request=request, operation_content=f"入库任务创建成功:托盘 {container},起始位置:{current_location},任务号:{task_id}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", object_id=container_obj.id if container_obj else None, module_name="WCS入库管理" ) else: log_failure_operation( request=request, operation_content=f"入库任务创建失败:托盘 {container},错误:{data_return.get('message', '未知错误')}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS入库管理" ) except Exception as log_error: pass return Response(data_return, status=http_status) except Exception as e: logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True) # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"入库请求处理异常:托盘 {container},错误:{str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS入库管理" ) except Exception as log_error: pass return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) # def generate_container_operate(self, container_obj, bound_number,allocation_target_location): def generate_container_operate(self, container_obj, allocation_target_location): # 获取托盘中所有有效的批次明细(排除已删除和状态3的) container_detaillist = ContainerDetailModel.objects.filter( container=container_obj, is_delete=False ).exclude(status=3) # 优化查询 - 预取批次对象 container_detaillist = container_detaillist.select_related('batch') # 创建批次数量字典 batch_totals = {} for detail in container_detaillist: batch_id = detail.batch_id if batch_id not in batch_totals: batch_totals[batch_id] = { 'obj': detail.batch, # 批次对象 'total_qty': 0 } batch_totals[batch_id]['total_qty'] += detail.goods_qty # 当前月份(单次计算多次使用) current_month = int(timezone.now().strftime("%Y%m")) current_time = timezone.now() current_location = container_obj.current_location # 为每个批次创建操作记录 for batch_id, data in batch_totals.items(): batch_obj = data['obj'] goods_qty = data['total_qty'] ContainerOperationModel.objects.create( month=current_month, container=container_obj, goods_code=batch_obj.goods_code, goods_desc=batch_obj.goods_desc, operation_type="inbound", batch_id=batch_obj.id, goods_qty=goods_qty, goods_weight=goods_qty, from_location=current_location, to_location=allocation_target_location, timestamp=current_time, operator="WMS", memo=f"WCS入库: 批次: {batch_obj.bound_number}, 数量: {goods_qty}" # 使用实际托盘中的数量 ) def generate_move_container_operate(self, container_obj, allocation_target_location,operate_type="adjust"): # 获取托盘中所有有效的批次明细 container_detaillist = ContainerDetailModel.objects.filter( container=container_obj, is_delete=False ).exclude(status=3) # 优化查询 - 预取批次对象 container_detaillist = container_detaillist.select_related('batch') # 创建批次数量字典 batch_totals = {} for detail in container_detaillist: batch_id = detail.batch_id if batch_id not in batch_totals: batch_totals[batch_id] = { 'obj': detail.batch, # 批次对象 'total_qty': 0 } batch_totals[batch_id]['total_qty'] += detail.goods_qty # 获取当前时间和位置信息 current_month = int(timezone.now().strftime("%Y%m")) current_time = timezone.now() current_location = container_obj.current_location # 为每个批次创建操作记录 for batch_id, data in batch_totals.items(): batch_obj = data['obj'] goods_qty = data['total_qty'] ContainerOperationModel.objects.create( month=current_month, container=container_obj, goods_code=batch_obj.goods_code, goods_desc=batch_obj.goods_desc, operation_type=operate_type, # 操作类型改为移动 batch_id=batch_obj.id, goods_qty=goods_qty, goods_weight=goods_qty, from_location=current_location, to_location=allocation_target_location, timestamp=current_time, operator="WMS", memo=f"托盘移动: 批次: {batch_obj.bound_number}, 数量: {goods_qty}" ) def generate_move_container_operate_no_batch(self, container_obj, bound_number,allocation_target_location): ContainerOperationModel.objects.create( month = int(timezone.now().strftime("%Y%m")), container = container_obj, goods_code = 'container', goods_desc = '托盘组', operation_type ="adjust", goods_qty = 1, goods_weight = 0, from_location = container_obj.current_location, to_location= allocation_target_location, timestamp=timezone.now(), memo=f"托盘组移库:从{container_obj.current_location}移库到{allocation_target_location}" ) def generate_container_operate_no_batch(self, container_obj, bound_number,allocation_target_location,operate_type="inbound"): ContainerOperationModel.objects.create( month = int(timezone.now().strftime("%Y%m")), container = container_obj, goods_code = 'container', goods_desc = '托盘组', operation_type =operate_type, goods_qty = 1, goods_weight = 0, from_location = container_obj.current_location, to_location= allocation_target_location, timestamp=timezone.now(), memo=f"WCSs手动出库库: 批次: {bound_number}, 数量: 1" ) def generate_task(self, container, current_location, target_location,batch_id,location_c_number,tasktype='inbound'): batch = BoundBatchModel.objects.filter(bound_number=batch_id).first() batch_detail = BoundDetailModel.objects.filter(bound_batch=batch).first() if not batch: logger.error(f"批次号 {batch_id} 不存在") return False data_tosave = { 'container': container, 'batch': batch, 'batch_number': batch_id, 'batch_out': None, 'bound_list': batch_detail.bound_list, 'sequence': 1, 'order_number' :location_c_number, 'priority': 1, 'current_location': current_location, 'month': timezone.now().strftime('%Y%m'), 'target_location': target_location, 'tasktype': tasktype, 'status': 103, 'is_delete': False } data_tosave.update(get_task_location_metadata(tasktype, current_location, target_location)) # 生成唯一递增的 taskid last_task = ContainerWCSModel.objects.filter( month=data_tosave['month'], ).order_by('-tasknumber').first() if last_task: number_id = last_task.tasknumber + 1 new_id = f"{number_id:05d}" else: new_id = "00001" number_id = f"{data_tosave['month']}{new_id}" data_tosave['taskid'] = f"inbound-{data_tosave['month']}-{new_id}" logger.info(f"生成入库任务: {data_tosave['taskid']}") # 每月生成唯一递增的 taskNumber data_tosave['tasknumber'] = number_id ContainerWCSModel.objects.create(**data_tosave) def generate_task_no_batch(self, container, current_location, target_location,batch_id,location_c_number,tasktype='inbound'): data_tosave = { 'container': container, 'batch': None, 'batch_number': batch_id, 'batch_out': None, 'bound_list': None, 'sequence': 1, 'order_number' :location_c_number, 'priority': 1, 'current_location': current_location, 'month': timezone.now().strftime('%Y%m'), 'target_location': target_location, 'tasktype': tasktype, 'status': 103, 'is_delete': False } data_tosave.update(get_task_location_metadata(tasktype, current_location, target_location)) # 生成唯一递增的 taskid last_task = ContainerWCSModel.objects.filter( month=data_tosave['month'], ).order_by('-tasknumber').first() if last_task: number_id = last_task.tasknumber + 1 new_id = f"{number_id:05d}" else: new_id = "00001" number_id = f"{data_tosave['month']}{new_id}" data_tosave['taskid'] = f"inbound-{data_tosave['month']}-{new_id}" logger.info(f"生成入库任务: {data_tosave['taskid']}") # 每月生成唯一递增的 taskNumber data_tosave['tasknumber'] = number_id ContainerWCSModel.objects.create(**data_tosave) def update_container_wcs(self, request, *args, **kwargs): data = self.request.data loggertask.info(f"WCS返回任务: 任务号:{data.get('taskNumber')},请求托盘:{data.get('container_number')}, 请求位置:{data.get('current_location')},") try: # 前置校验 container_obj, error_response = self.validate_container(data) if error_response: # 记录验证失败日志 try: log_failure_operation( request=request, operation_content=f"WCS任务更新失败:托盘验证失败 - {data.get('container_number', '未知')}", operation_level="update", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS任务管理" ) except Exception as log_error: pass return error_response # 更新托盘数据 if not self.update_container_data(container_obj, data): return Response( {'code': '400', 'message': '数据更新失败', 'data': data}, status=status.HTTP_400_BAD_REQUEST ) # 处理位置逻辑 task = ContainerWCSModel.objects.filter( container=container_obj.container_code, tasktype='inbound' ).first() if self.is_already_at_target(container_obj, data.get('current_location')): loggertask.info(f"WMS返回数据:任务号:{task.tasknumber-20000000000},托盘 {container_obj.container_code} 已在目标位置") # 记录任务完成日志 try: task_number = data.get('taskNumber', '未知') log_success_operation( request=request, operation_content=f"WCS任务完成:托盘 {container_obj.container_code} 已到达目标位置,任务号:{task_number}", operation_level="update", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", object_id=container_obj.id, module_name="WCS任务管理" ) except Exception as log_error: pass return self.handle_target_reached(container_obj, data) elif task: data_return = { 'code': '200', 'message': '任务已存在,重新下发', 'data': task.to_dict() } loggertask.info(f"WMS返回数据:任务号:{task.tasknumber-20000000000},入库请求托盘:{container_obj.container_code},起始位置:{data.get('current_location')},目标位置:{container_obj.target_location},返回结果:{data_return}") return Response(data_return, status=status.HTTP_200_OK) else: return self.handle_new_allocation(container_obj, data) except Exception as e: logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True) # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"WCS任务更新异常:托盘 {data.get('container_number', '未知')},任务号:{data.get('taskNumber', '未知')},错误:{str(e)}", operation_level="update", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS任务管理" ) except Exception as log_error: pass return Response({'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # ---------- 辅助函数 ---------- def validate_container(self, data): """验证托盘是否存在""" container = data.get('container_number') container_obj = ContainerListModel.objects.filter(container_code=container).first() if not container_obj: return None, Response({ 'code': '400', 'message': '托盘编码不存在', 'data': data }, status=status.HTTP_400_BAD_REQUEST) return container_obj, None def update_container_data(self, container_obj, data): """更新托盘数据""" serializer = ContainerListPostSerializer( container_obj, data=data, partial=True ) if serializer.is_valid(): serializer.save() return True return False def is_already_at_target(self, container_obj, current_location): """检查是否已在目标位置""" # print (current_location) # print (str(container_obj.target_location)) return current_location == str(container_obj.target_location) def handle_target_reached(self, container_obj, data): """处理已到达目标位置的逻辑""" logger.info(f"托盘 {container_obj.container_code} 已在目标位置") task = self.get_task_by_tasknumber(data) self.update_pressure_values(task, container_obj) # if task.working == 1: # alloca = LocationAllocation() # alloca.update_batch_goods_in_location_qty(container_obj.container_code, 1) task = self.process_task_completion(data) if not task: return Response({'code': '400', 'message': '任务不存在', 'data': data}, status=status.HTTP_400_BAD_REQUEST) if task and task.tasktype == 'inbound': self.update_storage_system(container_obj) if task and task.tasktype == 'outbound' and task.status == 300: success = self.handle_outbound_completion(container_obj, task) if not success: return Response({'code': '500', 'message': '出库状态更新失败', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # WCS完成一条任务后,只下发一条新任务,优先同楼层 # 从刚完成的任务中提取楼层信息 preferred_layer = None try: parts = str(task.current_location).split('-') preferred_layer = parts[3] if len(parts) >= 4 else None except Exception: pass OutboundService.process_next_task(single_task=True, preferred_layer=preferred_layer) if task and task.tasktype == 'check' and task.status == 300: success = self.handle_outbound_completion(container_obj, task) if not success: return Response({'code': '500', 'message': '出库状态更新失败', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # WCS完成一条任务后,只下发一条新任务,优先同楼层 # 从刚完成的任务中提取楼层信息 preferred_layer = None try: parts = str(task.current_location).split('-') preferred_layer = parts[3] if len(parts) >= 4 else None except Exception: pass OutboundService.process_next_task(single_task=True, preferred_layer=preferred_layer) return Response({ 'code': '200', 'message': '当前位置已是目标位置', 'data': data }, status=status.HTTP_200_OK) def get_task_by_tasknumber(self, data): taskNumber = data.get('taskNumber') + 20000000000 task = ContainerWCSModel.objects.filter(tasknumber=taskNumber).first() if task: return task else: return None def process_task_completion(self, data): """处理任务完成状态""" taskNumber = data.get('taskNumber') + 20000000000 task = ContainerWCSModel.objects.filter(tasknumber=taskNumber).first() if task: task.status = 300 task.message = '任务已完成' task.working = 0 task.save() try: original_task_number = data.get('taskNumber') if original_task_number is not None: WCSTaskLogModel.objects.filter( task_number=original_task_number ).update(is_completed=True) except Exception as log_error: logger.warning(f"更新任务日志完成状态失败: {log_error}") return task def update_pressure_values(self, task, container_obj): """更新压力值计算""" if task : base_location_obj = base_location.objects.get(id=1) layer = int(container_obj.target_location.split('-')[-1]) pressure_field = f"layer{layer}_pressure" logger.info(f"更新压力值,压力字段:{pressure_field}") current_pressure = getattr(base_location_obj, pressure_field, 0) updated_pressure = max(current_pressure - task.working, 0) setattr(base_location_obj, pressure_field, updated_pressure) base_location_obj.save() def update_storage_system(self, container_obj): """更新仓储系统状态""" allocator = LocationAllocation() location_code = self.get_location_code(container_obj.target_location) # 链式更新操作 update_operations = [ (allocator.update_location_status, location_code, 'occupied'), (allocator.update_location_container_link, location_code, container_obj.container_code), (allocator.update_container_detail_status, container_obj.container_code, 2) ] for func, *args in update_operations: if not func(*args): logger.error(f"操作失败: {func.__name__}") return False return True def get_location_code(self, target_location): """从目标位置解析获取位置编码""" parts = target_location.split('-') coordinate = f"{int(parts[1])}-{int(parts[2])}-{int(parts[3])}" return LocationModel.objects.filter(coordinate=coordinate).first().location_code def handle_new_allocation(self, container_obj, data): """处理新库位分配逻辑""" allocator = LocationAllocation() container_code = container_obj.container_code # 获取并验证库位分配 location = allocator.get_location_by_status(container_code, data.get('current_location')) if not location or not self.perform_initial_allocation(allocator, location, container_code): return Response({'code': '400', 'message': '库位分配失败', 'data': data}, status=status.HTTP_400_BAD_REQUEST) # 生成目标位置并更新托盘 target_location = self.generate_target_location(location) container_obj.target_location = target_location container_obj.save() # 创建任务并返回响应 task = self.create_inbound_task(container_code, data, target_location, location) return Response({ 'code': '200', 'message': '任务下发成功', 'data': task.to_dict() }, status=status.HTTP_200_OK) def perform_initial_allocation(self, allocator, location, container_code): """执行初始库位分配操作""" operations = [ (allocator.update_location_status, location.location_code, 'reserved'), (allocator.update_location_group_status, location.location_code), (allocator.update_batch_status, container_code, '2'), (allocator.update_location_group_batch, location, container_code), (allocator.update_location_container_link, location.location_code, container_code), (allocator.update_container_detail_status, container_code, 2) ] for func, *args in operations: if not func(*args): logger.error(f"分配操作失败: {func.__name__}") return False return True def generate_target_location(self, location): """生成目标位置字符串""" return ( f"{location.warehouse_code}-" f"{int(location.row):02d}-" f"{int(location.col):02d}-" f"{int(location.layer):02d}" ) def create_inbound_task(self, container_code, data, target_location, location): """创建入库任务""" batch_id = LocationAllocation().get_batch(container_code) self.generate_task( container_code, data.get('current_location'), target_location, batch_id, location.c_number ) task = ContainerWCSModel.objects.get(container=container_code, tasktype='inbound') self.inport_update_task(task.id, container_code) return task def inport_update_task(self, wcs_id,container_id): try: task_obj = ContainerWCSModel.objects.filter(id=wcs_id).first() if task_obj: container_detail_obj = ContainerDetailModel.objects.filter(container=container_id,is_delete=False).all() if container_detail_obj: for detail in container_detail_obj: # 保存到数据库 batch = BoundDetailModel.objects.filter(bound_batch_id=detail.batch.id).first() TaskModel.objects.create( task_wcs = task_obj, container_detail = detail, batch_detail = batch ) logger.info(f"入库任务 {wcs_id} 已更新") else: logger.info(f"入库任务 {container_id} 批次不存在") else: logger.info(f"入库任务 {wcs_id} 不存在") except Exception as e: logger.error(f"处理入库任务时发生错误: {str(e)}", exc_info=True) return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def release_location(self,request): """释放库位""" """添加权限管理""" token = self.request.META.get('HTTP_TOKEN') appid = self.request.META.get('HTTP_APPID') if not token: return Response({'code': '401', 'message': '请登录', 'data': None}, status=status.HTTP_200_OK) user = StaffListModel.objects.filter(openid=token,appid=appid).first() if not user: return Response({'code': '401', 'message': '请登录', 'data': None}, status=status.HTTP_200_OK) if user.staff_type not in ['Supervisor', 'Manager','Admin','管理员','经理','主管']: return Response({'code': '401', 'message': '无权限', 'data': None}, status=status.HTTP_200_OK) try: location_release = request.data.get('location_release') location_row = location_release.split('-')[1] location_col = location_release.split('-')[2] location_layer = location_release.split('-')[3] location= LocationModel.objects.filter(row=location_row, col=location_col, layer=location_layer).first() location_code = location.location_code allocator = LocationAllocation() with transaction.atomic(): if not allocator.release_location(location_code): raise Exception("解除库位关联失败") if not allocator.update_location_status(location_code, 'available'): raise Exception("库位状态更新失败") if not allocator.update_location_group_status(location_code): raise Exception("库位组状态更新失败") # 记录成功日志 try: log_success_operation( request=request, operation_content=f"释放库位成功:库位编码 {location_code},库位位置 {location_release}", operation_level="update", operator=user.staff_name if user else None, object_id=location.id if location else None, module_name="WCS库位管理" ) except Exception as log_error: pass except Exception as e: logger.error(f"解除库位关联失败: {str(e)}") # 记录失败日志 try: location_info = request.data.get('location_release', '未知') if hasattr(request, 'data') else '未知' log_failure_operation( request=request, operation_content=f"释放库位失败:库位位置 {location_info},错误:{str(e)}", operation_level="update", operator=user.staff_name if user else None, module_name="WCS库位管理" ) except Exception as log_error: pass return Response({'code': '500', 'message': e, 'data': None}, status=status.HTTP_200_OK) return Response({'code': '200', 'message': '解除库位关联成功', 'data': None}, status=status.HTTP_200_OK) def handle_outbound_completion(self, container_obj, task): """处理出库完成后的库位释放和状态更新""" try: allocator = LocationAllocation() location_task = task.current_location if location_task == '103' or location_task == '203': return True else: location_row = location_task.split('-')[1] location_col = location_task.split('-')[2] location_layer = location_task.split('-')[3] location= LocationModel.objects.filter(row=location_row, col=location_col, layer=location_layer).first() location_code = location.location_code # 事务确保原子性 with transaction.atomic(): # 解除库位与托盘的关联 if not allocator.release_location(location_code): raise Exception("解除库位关联失败") # 更新库位状态为可用 if not allocator.update_location_status(location_code, 'available'): raise Exception("库位状态更新失败") # 更新库位组的统计信息 self.handle_group_location_status(location_code, location.location_group) # 更新托盘状态为已出库(假设状态3表示已出库) container_obj.status = 3 container_obj.save() return True except Exception as e: logger.error(f"出库完成处理失败: {str(e)}") return False def handle_group_location_status(self,location_code,location_group): """ 处理库位组和库位的关联关系 :param location_code: 库位编码 :param location_group: 库位组编码 :return: """ # 1. 获取库位空闲状态的库位数目 location_obj_number = LocationModel.objects.filter( location_group=location_group, status='available' ).all().count() # 2. 获取库位组对象 logger.info(f"库位组 {location_group} 下的库位数目:{location_obj_number}") # 1. 获取库位和库位组的关联关系 location_group_obj = LocationGroupModel.objects.filter( group_code=location_group ).first() if not location_group_obj: logger.info(f"库位组 {location_group} 不存在") return None else: if location_obj_number == 0: # 库位组库位已满,更新库位组状态为full location_group_obj.status = 'full' location_group_obj.save() elif location_obj_number < location_group_obj.max_capacity: location_group_obj.status = 'occupied' location_group_obj.save() else: location_group_obj.status = 'available' location_group_obj.current_batch = '' location_group_obj.current_goods_code = '' location_group_obj.save() # PDA组盘入库 将扫描到的托盘编码和批次信息保存到数据库 # 1. 先查询托盘对象,如果不存在,则创建托盘对象 # 2. 循环处理每个批次,查询批次对象, # 3. 更新批次数据(根据业务规则) # 4. 保存到数据库 # 5. 保存操作记录到数据库 class ContainerDetailViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = ContainerDetailFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerDetailModel.objects.filter( is_delete=False) else: return ContainerDetailModel.objects.filter( id=id, is_delete=False) else: return ContainerDetailModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerDetailGetSerializer elif self.action in ['create']: return ContainerDetailPostSerializer elif self.action in ['update']: return ContainerDetailPutSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data from .container_operate import ContainerService ContainerService.create_container_operation(data,logger=logger) # 将处理后的数据返回(或根据业务需求保存到数据库) res_data={ "code": "200", "msg": "Success Create", "data": data } try: log_success_operation( request=self.request, operation_content=f"创建托盘操作:{data.get('container_code', '未知')}", operation_level="new", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, module_name="托盘管理" ) except Exception as e: pass return Response(res_data, status=200) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) try: log_success_operation( request=self.request, operation_content=f"更新托盘详情 ID:{pk}", operation_level="update", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=pk, module_name="托盘管理" ) except Exception as e: pass return Response(serializer.data, status=200, headers=headers) def destroy(self, request, pk): qs = self.get_object() qs.is_delete = True qs.save() try: log_success_operation( request=self.request, operation_content=f"删除托盘详情 ID:{pk}", operation_level="delete", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=pk, module_name="托盘管理" ) except Exception as e: pass return Response({'code': 200,'message': '删除成功', 'data': None}, status=200) def containerdetail_list(self, request): """ 获取托盘详情列表 """ try: container_id = request.query_params.get('container') if not container_id: return Response( {'code': 400, 'message': '缺少托盘ID参数', 'data': None}, status=status.HTTP_400_BAD_REQUEST ) # 获取托盘对象 try: container = ContainerListModel.objects.get(id=container_id) except ContainerListModel.DoesNotExist: return Response( {'code': 404, 'message': '指定托盘不存在', 'data': None}, status=status.HTTP_404_NOT_FOUND ) # 查询关联批次明细(排除状态0和3) details = ContainerDetailModel.objects.filter( container=container,is_delete=False ).exclude( status__in=[0, 3] ).select_related('batch') details_serializer = ContainerDetailSimpleGetSerializer(details, many=True) return Response( {'code': 200, 'message': 'Success', 'data': details_serializer.data}, status=status.HTTP_200_OK ) except Exception as e: return Response( {'code': 500, 'message': f'服务器错误: {str(e)}', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def locationdetail_list(self, request): """ 获取库位所处托盘的信息(按批次号 + 数量分组) 新增批次总量统计功能 """ try: container_id = request.query_params.get('container') if not container_id: return Response( {'code': 400, 'message': '缺少托盘ID参数', 'data': None}, status=status.HTTP_400_BAD_REQUEST ) # 获取托盘对象 try: container = ContainerListModel.objects.get(id=container_id) except ContainerListModel.DoesNotExist: return Response( {'code': 404, 'message': '指定托盘不存在', 'data': None}, status=status.HTTP_404_NOT_FOUND ) # 查询关联批次明细(排除状态0和3) details = ContainerDetailModel.objects.filter( container=container,is_delete=False ).exclude( status__in=[0, 3] ).select_related('batch') if not details.exists(): return Response( {'code': 404, 'message': '未找到有效批次数据', 'data': None}, status=status.HTTP_404_NOT_FOUND ) # 按批次号 + 数量分组统计 batch_dict = {} batch_qty_dict = defaultdict(int) # 使用默认字典自动初始化 for detail in details: if not detail.batch: continue bound_number = detail.batch.bound_number goods_qty = detail.goods_qty - detail.goods_out_qty # 剔除出库数量 # 组合键:批次号 + 当前数量 batch_key = (bound_number, goods_qty) batch_qty_dict[bound_number] += goods_qty # 自动处理键初始化 # 分组统计 if batch_key not in batch_dict: batch_obj = BoundBatchModel.objects.filter( bound_number=bound_number).first() batch_dict[batch_key] = { "goods_code": detail.goods_code, "goods_desc": detail.goods_desc, "goods_qty": goods_qty, "goods_class": detail.goods_class, "goods_package": batch_obj.goods_package, "batch_total_qty": batch_obj.goods_qty, "batch_total_in_qty": batch_obj.goods_in_qty - batch_obj.goods_out_qty, "status": detail.status, "group_qty": 1, "create_time": detail.create_time, } else: batch_dict[batch_key]["group_qty"] += 1 # 重构数据结构 results = [] for (bound_number, qty), data in batch_dict.items(): results.append({ **data, "bound_number": bound_number, "current_qty": qty, "total_batch_qty": batch_qty_dict[bound_number] # 添加批次总量 }) batch_totals =[] for bound_number, qty in batch_qty_dict.items(): batch_totals.append({ "bound_number": bound_number, "total_batch_qty": qty }) return Response( { "code": 200, "message": "Success", "data": { "container_code": container.container_code, "count": len(results), "results": results, "batch_totals": batch_totals # 可选:单独返回批次总量 } }, status=status.HTTP_200_OK ) except Exception as e: return Response( {'code': 500, 'message': f'服务器错误: {str(e)}', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def pdadetail_list(self, request): """ 获取PDA组盘入库的托盘详情列表 """ try: container_code = request.query_params.get('container_code') if not container_code: return Response( {'code': 400, 'message': '缺少托盘编码参数', 'data': None}, status=status.HTTP_200_OK ) # 获取托盘对象 try: container = ContainerListModel.objects.get(container_code=container_code) except ContainerListModel.DoesNotExist: return Response( {'code': 404, 'message': '指定托盘不存在', 'data': None}, status=status.HTTP_200_OK ) # 查询关联批次明细(排除状态0和3) details = ContainerDetailModel.objects.filter( container=container,is_delete=False ).exclude( status__in=[0, 3] ).select_related('batch') if not details.exists(): return Response( {'code': 404, 'message': '未找到有效批次数据', 'data': None}, status=status.HTTP_200_OK ) # 按批次号 + 数量分组统计 batch_dict = {} batch_qty_dict = defaultdict(int) # 使用默认字典自动初始化 for detail in details: if not detail.batch: continue bound_number = detail.batch.bound_number goods_qty = detail.goods_qty - detail.goods_out_qty # 剔除出库数量 # 组合键:批次号 + 当前数量 batch_key = (bound_number, goods_qty) batch_qty_dict[bound_number] += goods_qty # 自动处理键初始化 # 分组统计 if batch_key not in batch_dict: batch_obj = BoundBatchModel.objects.filter( bound_number=bound_number).first() batch_dict[batch_key] = { "goods_code": detail.goods_code, "goods_desc": detail.goods_desc, "goods_qty": goods_qty, "goods_class": detail.goods_class, "goods_package": batch_obj.goods_package, "batch_total_qty": batch_obj.goods_qty, "batch_total_in_qty": batch_obj.goods_in_qty - batch_obj.goods_out_qty, "status": detail.status, "group_qty": 1, "create_time": detail.create_time, } else: batch_dict[batch_key]["group_qty"] += 1 # 重构数据结构 results = [] for (bound_number, qty), data in batch_dict.items(): results.append({ **data, "bound_number": bound_number, "current_qty": qty, "total_batch_qty": batch_qty_dict[bound_number] # 添加批次总量 }) batch_totals =[] for bound_number, qty in batch_qty_dict.items(): batch_totals.append({ "bound_number": bound_number, "total_batch_qty": qty }) return Response( { "code": 200, "message": "Success", "data": { "count": len(results), "results": results, "batch_totals": batch_totals # 可选:单独返回批次总量 } }, status=status.HTTP_200_OK ) except Exception as e: return Response( {'code': 500, 'message': f'服务器错误: {str(e)}', 'data': None}, status=status.HTTP_200_OK ) # 托盘操作历史记录 class ContainerOperateViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "timestamp" ] filter_class = ContainerOperationFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerOperationModel.objects.filter( is_delete=False) else: return ContainerOperationModel.objects.filter( id=id, is_delete=False) else: return ContainerOperationModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerOperationGetSerializer elif self.action in ['create', 'update']: return ContainerOperationPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data try: serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) # 记录成功日志 try: log_success_operation( request=self.request, operation_content=f"创建托盘操作记录:{data.get('container_code', '未知')}", operation_level="new", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=serializer.data.get('id'), module_name="托盘管理" ) except Exception as e: pass return Response(serializer.data, status=200, headers=headers) except Exception as e: # 记录失败日志 try: log_failure_operation( request=self.request, operation_content=f"创建托盘操作记录失败:{str(e)}", operation_level="new", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, module_name="托盘管理" ) except Exception as log_error: pass raise def update(self, request, pk): qs = self.get_object() data = self.request.data try: serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) # 记录成功日志 try: log_success_operation( request=self.request, operation_content=f"更新托盘操作记录 ID:{pk}", operation_level="update", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=pk, module_name="托盘管理" ) except Exception as e: pass return Response(serializer.data, status=200, headers=headers) except Exception as e: # 记录失败日志 try: log_failure_operation( request=self.request, operation_content=f"更新托盘操作记录失败 ID:{pk},错误: {str(e)}", operation_level="update", operator=self.request.auth.name if hasattr(self.request, 'auth') and self.request.auth else None, object_id=pk, module_name="托盘管理" ) except Exception as log_error: pass raise # 出库任务生成 class OutboundService: @staticmethod def generate_task_id(): """生成唯一任务ID(格式: outbound-年月-顺序号)""" month = timezone.now().strftime("%Y%m") last_task = ContainerWCSModel.objects.filter( tasktype='outbound', month=int(month) ).order_by('-sequence').first() sequence = last_task.sequence + 1 if last_task else 1 return f"outbound-{month}-{sequence:05d}" @staticmethod def send_task_to_wcs(task): """异步发送任务到WCS(非阻塞版本)""" reference_location = select_reference_location( task.tasktype, task.current_location, task.target_location ) if task.location_group_id is None or task.order_number in (None, 0): location_meta = resolve_location_group_metadata(reference_location) update_fields = [] if task.location_group_id is None and location_meta['location_group_id'] is not None: task.location_group_id = location_meta['location_group_id'] update_fields.append('location_group_id') if task.order_number in (None, 0) and location_meta['order_number'] not in (None, 0): task.order_number = location_meta['order_number'] update_fields.append('order_number') if update_fields: task.save(update_fields=update_fields) # 提取任务关键数据用于线程(避免直接传递ORM对象) task_data = { 'task_id': task.pk, # 使用主键而不是对象 'send_data': { "code":'200', "message": task.message, "data":{ "taskid": task.taskid, "container": task.container, "current_location": task.current_location, "target_location": task.target_location, "tasktype": task.tasktype, "month": task.month, "message": task.message, "status": task.status, "taskNumber": task.tasknumber-20000000000, "order_number":task.order_number, "sequence":task.sequence, "location_group_id": task.location_group_id or DEFAULT_LOCATION_GROUP_ID, } } } loggertask.info(f"任务号:{task.tasknumber-20000000000}任务发送请求:{task.container},起始位置:{task.current_location},目标位置:{task.target_location},返回结果:{task_data}") # 异步记录日志到数据库(不阻塞发送) log_thread = threading.Thread( target=OutboundService._async_log_handler, kwargs={ 'task_number': task.tasknumber - 20000000000, 'container': str(task.container), 'current_location': task.current_location, 'target_location': task.target_location, 'task_type': task.tasktype, 'order_number': task.order_number, 'sequence': task.sequence, 'response_data': task_data, 'log_type': task.tasktype or 'outbound', 'location_group_id': task.location_group_id or DEFAULT_LOCATION_GROUP_ID, }, daemon=True ) log_thread.start() # 创建并启动线程 thread = threading.Thread( target=OutboundService._async_send_handler, kwargs=task_data, daemon=True # 守护线程(主程序退出时自动终止) ) thread.start() return True # 立即返回表示已开始处理 @staticmethod def _extract_layer(location): """解析库位字符串中的楼层信息""" try: parts = str(location).split('-') return parts[3] if len(parts) >= 4 else None except Exception: return None @staticmethod def _get_running_layers(): """获取当前正在执行中的出库任务所在楼层""" running_locations = ContainerWCSModel.objects.filter( tasktype='outbound', status__gt=100, status__lt=300, working=1, is_delete=False ).values_list('current_location', flat=True) active_layers = set() for location in running_locations: layer = OutboundService._extract_layer(location) if layer: active_layers.add(layer) return active_layers @staticmethod def _async_log_handler(task_number, container, current_location, target_location, task_type, order_number, sequence, response_data, log_type='outbound', location_group_id=None): """异步记录 WCS 任务发送日志到数据库(不阻塞发送)""" try: close_old_connections() # 解析库位组与优先级 group_id = location_group_id order_number_value = order_number left_priority = None right_priority = None floor = None group_obj = None if group_id not in (None, 0): group_obj = LocationGroupModel.objects.filter( id=group_id ).only('id', 'group_code', 'left_priority', 'right_priority').first() if group_obj: left_priority = group_obj.left_priority right_priority = group_obj.right_priority if group_obj is None or order_number_value in (None, 0): try: parts = current_location.split('-') if len(parts) >= 4: row = int(parts[1]) col = int(parts[2]) layer = int(parts[3]) floor = parts[3] loc = LocationModel.objects.filter(row=row, col=col, layer=layer).only( 'location_group', 'c_number' ).first() if loc: group = group_obj or LocationGroupModel.objects.filter( group_code=loc.location_group ).first() if group: group_obj = group group_id = group.id left_priority = group.left_priority right_priority = group.right_priority if order_number_value in (None, 0): order_number_value = loc.c_number or DEFAULT_ORDER_NUMBER except Exception as e: logger.error(f"解析库位组信息失败: {e}") if group_id in (None, 0) or order_number_value in (None, 0): metadata = resolve_location_group_metadata(current_location) if group_id in (None, 0): group_id = metadata['location_group_id'] if order_number_value in (None, 0): order_number_value = metadata.get('order_number', DEFAULT_ORDER_NUMBER) # 创建日志记录 WCSTaskLogModel.objects.create( task_number=task_number, container=container, current_location=current_location, target_location=target_location, location_group_id=group_id or DEFAULT_LOCATION_GROUP_ID, left_priority=left_priority, right_priority=right_priority, task_type=task_type, order_number=order_number_value or DEFAULT_ORDER_NUMBER, sequence=sequence, response_data=response_data, floor=floor, log_type=log_type or task_type or 'outbound', ) except Exception as e: logger.error(f"记录 WCS 任务日志失败: {e}", exc_info=True) @staticmethod def _async_send_handler(task_id, send_data): """异步处理的实际工作函数""" try: # 每个线程需要独立的数据库连接 close_old_connections() # 重新获取任务对象(确保使用最新数据) task = ContainerWCSModel.objects.get(pk=task_id) # 发送第一个请求(不处理结果) # requests.post( # "http://127.0.0.1:8008/container/batch/", # json=send_data, # timeout=10 # ) # # 发送关键请求 response = requests.post( "http://192.168.18.200:1616/wcs/WebApi/getOutTask", json=send_data, timeout=10 ) # 处理响应 if response.status_code == 200: task.status = 200 task.save() logger.info(f"任务 {task.taskid} 已发送") else: logger.error(f"WCS返回错误: {response.text}") except Exception as e: logger.error(f"发送失败: {str(e)}") finally: close_old_connections() # 清理数据库连接 @staticmethod def create_initial_tasks(container_list,bound_list_id): """生成初始任务队列,返回楼层信息用于初始化发送""" with transaction.atomic(): current_WCS = ContainerWCSModel.objects.filter( tasktype='outbound', bound_list_id=bound_list_id, is_delete=False, status__lt=300 ).first() if current_WCS: logger.error(f"当前{bound_list_id}已有出库任务") return { "success": False, "msg": f"出库申请 {bound_list_id} 仍有待完成任务", } tasks = [] task_layers = set() start_sequence = ContainerWCSModel.objects.filter(tasktype='outbound').count() + 1 tasknumber = ContainerWCSModel.objects.filter().count() tasknumber_index = 1 for index, container in enumerate(container_list, start=start_sequence): container_obj = ContainerListModel.objects.filter(id =container['container_number']).first() if not container_obj: logger.error(f"托盘记录 {container['container_number']} 不存在") return { "success": False, "msg": "托盘信息缺失,无法创建任务", } if container_obj.current_location != container_obj.target_location: logger.error(f"托盘 {container_obj.container_code} 未到达目的地,不生成任务") return { "success": False, "msg": f"托盘 {container_obj.container_code} 未处于可出库状态", } # 检查前序作业 existing_task = ContainerWCSModel.objects.filter( container=container_obj.container_code, working=1, status__lt=300, is_delete=False ).exists() if existing_task: logger.error(f"托盘 {container_obj.container_code} 仍有未完成任务") return { "success": False, "msg": f"托盘 {container_obj.container_code} 仍有未完成任务", } OutBoundDetail_obj = OutBoundDetailModel.objects.filter(bound_list=bound_list_id,bound_batch_number_id=container['batch_id']).first() if not OutBoundDetail_obj: logger.error(f"批次 {container['batch_id']} 不存在") return { "success": False, "msg": f"批次 {container['batch_id']} 不存在", } month = int(timezone.now().strftime("%Y%m")) location_meta = get_task_location_metadata( "outbound", container_obj.current_location, "103" ) task = ContainerWCSModel( taskid=OutboundService.generate_task_id(), batch = OutBoundDetail_obj.bound_batch_number, batch_out = OutBoundDetail_obj.bound_batch, bound_list = OutBoundDetail_obj.bound_list, sequence=index, order_number = container.get('c_number') or location_meta['order_number'] or DEFAULT_ORDER_NUMBER, priority=100, tasknumber = month*100000+tasknumber_index+tasknumber, container=container_obj.container_code, current_location=container_obj.current_location, target_location="103", tasktype="outbound", month=int(timezone.now().strftime("%Y%m")), message="等待出库", status=100, location_group_id=location_meta['location_group_id'], ) layer = None try: parts = str(task.current_location).split('-') if len(parts) >= 4: layer = parts[3] except Exception: layer = None if layer: task_layers.add(layer) tasknumber_index += 1 tasks.append(task) container_obj = ContainerListModel.objects.filter(container_code=task.container).first() container_obj.target_location = task.target_location container_obj.save() ContainerWCSModel.objects.bulk_create(tasks) logger.info(f"已创建 {len(tasks)} 个初始任务") return { "success": True, "layers": sorted(task_layers), "task_count": len(tasks), } @staticmethod def create_initial_check_tasks(container_list,batch_id): """生成初始任务队列""" with transaction.atomic(): current_WCS = ContainerWCSModel.objects.filter(tasktype='check',batch_id = batch_id,is_delete=False,working=1).first() if current_WCS: logger.error(f"当前{batch_id}已有检查任务") return False tasks = [] start_sequence = ContainerWCSModel.objects.filter(tasktype='check').count() + 1 tasknumber = ContainerWCSModel.objects.filter().count() tasknumber_index = 1 for index, container in enumerate(container_list, start=start_sequence): container_obj = ContainerListModel.objects.filter(id =container['container_number']).first() if container_obj.current_location != container_obj.target_location: logger.error(f"托盘 {container_obj.container_code} 未到达目的地,不生成任务") return False batch_obj = BoundBatchModel.objects.filter(id =batch_id).first() month = int(timezone.now().strftime("%Y%m")) location_meta = get_task_location_metadata( "check", container_obj.current_location, "103" ) task = ContainerWCSModel( taskid=OutboundService.generate_task_id(), batch = batch_obj, batch_out = None, bound_list = None, sequence=index, order_number = container.get('c_number') or location_meta['order_number'] or DEFAULT_ORDER_NUMBER, priority=100, tasknumber = month*100000+tasknumber_index+tasknumber, container=container_obj.container_code, current_location=container_obj.current_location, target_location="103", tasktype="check", month=int(timezone.now().strftime("%Y%m")), message="等待出库", status=100, location_group_id=location_meta['location_group_id'], ) tasknumber_index += 1 tasks.append(task) container_obj = ContainerListModel.objects.filter(container_code=task.container).first() container_obj.target_location = task.target_location container_obj.save() ContainerWCSModel.objects.bulk_create(tasks) logger.info(f"已创建 {len(tasks)} 个初始任务") @staticmethod def insert_new_tasks(new_tasks): """动态插入新任务并重新排序""" with transaction.atomic(): pending_tasks = list(ContainerWCSModel.objects.filter(status=100)) # 插入新任务 for new_task_data in new_tasks: target_location = new_task_data.get('target_location', 'OUT01') location_meta = get_task_location_metadata( "outbound", new_task_data['current_location'], target_location ) order_number = new_task_data.get('order_number') if order_number in (None, 0): order_number = location_meta['order_number'] if order_number in (None, 0): order_number = DEFAULT_ORDER_NUMBER new_task = ContainerWCSModel( taskid=OutboundService.generate_task_id(), priority=new_task_data.get('priority', 100), order_number=order_number, container=new_task_data['container'], current_location=new_task_data['current_location'], target_location=target_location, tasktype="outbound", month=int(timezone.now().strftime("%Y%m")), message="等待出库", status=100, location_group_id=location_meta['location_group_id'], ) # 找到插入位置 insert_pos = 0 for i, task in enumerate(pending_tasks): if new_task.priority < task.priority: insert_pos = i break else: insert_pos = len(pending_tasks) pending_tasks.insert(insert_pos, new_task) # 重新分配顺序号 for i, task in enumerate(pending_tasks, start=1): task.sequence = i if task.pk is None: task.save() else: task.save(update_fields=['sequence']) logger.info(f"已插入 {len(new_tasks)} 个新任务") @staticmethod def process_next_task(single_task=False, preferred_layer=None, initial_layers=None): """处理下一个任务 - 支持前端可配置的跨楼层并发与同层排序 Args: single_task: 如果为True,只下发一条任务(用于WCS完成回调场景) 如果为False,批量下发多条任务(用于初始下发场景) preferred_layer: 优先选择的楼层(用于single_task=True时,优先同楼层任务) """ # 获取待处理任务,优先按批次排序(同一批次连续出),同一批次内按sequence排序 # 使用Case处理batch_out为None的情况,确保有批次的任务优先 from django.db.models import F, Case, When, IntegerField from django.conf import settings from .models import DispatchConfig def get_pending_tasks(): """获取待处理任务查询集""" return ContainerWCSModel.objects.filter( status=100, working=1, is_delete=False ).annotate( # 为排序添加批次ID字段,None值排最后 batch_out_id_for_sort=Case( When(batch_out__isnull=False, then=F('batch_out_id')), default=999999999, # None值使用大数字,排到最后 output_field=IntegerField() ) ).order_by('batch_out_id_for_sort', 'sequence') pending_tasks = get_pending_tasks() if not pending_tasks.exists(): logger.info("没有待处理任务") return # 读取调度配置(默认2条跨楼层并发) cfg = DispatchConfig.get_active_config() cross_floor_limit = max(1, int(cfg.cross_floor_concurrent_limit or 2)) desired_layers = set(initial_layers or []) active_layers = OutboundService._get_running_layers() # 处理任务列表 # 如果single_task=True,只下发1条;否则批量下发(最多cross_floor_limit条) processed_count = 0 if desired_layers: max_tasks = max(len(desired_layers), 1) else: max_tasks = 1 if single_task else cross_floor_limit skip_count = 0 max_skip = max(20, len(desired_layers) * 5) dispatched_ids = set() used_layers = set() blocked_layers = set() while processed_count < max_tasks and skip_count < max_skip: # 重新获取待处理任务(因为可能有任务被跳过) pending_tasks = get_pending_tasks().exclude(pk__in=dispatched_ids) if not pending_tasks.exists(): break if desired_layers: effective_layers = desired_layers - blocked_layers if not effective_layers: logger.info("初始楼层均存在执行中的任务,暂不新增同楼层下发") break if used_layers.issuperset(effective_layers): logger.info("已完成初始多楼层任务的分发") break # 如果single_task=True且提供了preferred_layer,优先选择同楼层的任务 next_task = None if desired_layers: effective_layers = desired_layers - blocked_layers remaining_layers = effective_layers - used_layers target_layers = remaining_layers if remaining_layers else effective_layers prioritized = [] fallback = [] for task in pending_tasks: task_layer = OutboundService._extract_layer(task.current_location) if task_layer in target_layers: prioritized.append(task) else: fallback.append(task) if prioritized: next_task = prioritized[0] elif remaining_layers: skip_count += 1 continue elif fallback: next_task = fallback[0] elif single_task and preferred_layer: # 先尝试找同楼层的任务(在同楼层任务中,仍然按批次和sequence排序) same_layer_tasks = [] other_layer_tasks = [] for task in pending_tasks: task_layer = OutboundService._extract_layer(task.current_location) if task_layer == preferred_layer: same_layer_tasks.append(task) else: other_layer_tasks.append(task) # 优先从同楼层任务中选择 if same_layer_tasks: next_task = same_layer_tasks[0] logger.info(f"优先选择同楼层任务,楼层: {preferred_layer}, 任务: {next_task.taskid}") # 如果没找到同楼层的任务,使用第一个任务(按批次和sequence排序) elif other_layer_tasks: next_task = other_layer_tasks[0] logger.info(f"未找到同楼层任务,使用其他楼层任务,任务: {next_task.taskid}") else: next_task = pending_tasks.first() else: # 根据同层排序策略,仍旧使用 batch_then_sequence(已在order_by体现) next_task = pending_tasks.first() if not next_task: break dispatched_ids.add(next_task.pk) location = next_task.current_location # 解析楼层(假设格式 Wxx-row-col-layer) task_layer = OutboundService._extract_layer(location) if location == '103' or location == '203': logger.info(f"需要跳过该任务: {next_task.taskid}, 位置: {location}") next_task.status = 200 next_task.working = 0 next_task.save() skip_count += 1 # 跳过这个任务后,继续处理下一个 continue if task_layer and task_layer in active_layers: logger.info(f"楼层 {task_layer} 已有执行中的任务,跳过下发: {next_task.taskid}") blocked_layers.add(task_layer) skip_count += 1 continue # 跨楼层并发控制:同一轮不允许重复楼层(仅批量下发时生效) if not single_task and not desired_layers and task_layer and task_layer in used_layers: skip_count += 1 continue try: allocator = LocationAllocation() allocation_success = perform_initial_allocation( allocator, next_task.current_location ) if not allocation_success: logger.warning(f"任务分配失败,跳过: {next_task.taskid}") skip_count += 1 continue OutboundService.send_task_to_wcs(next_task) # 标记任务为已下发,避免重复下发 next_task.status = 150 next_task.save(update_fields=['status']) processed_count += 1 if task_layer: used_layers.add(task_layer) active_layers.add(task_layer) logger.info(f"成功下发任务: {next_task.taskid}, 批次: {next_task.batch_out_id if next_task.batch_out else '无批次'}") except Exception as e: logger.error(f"任务处理失败: {next_task.taskid}, 错误: {str(e)}") # 处理失败后,继续尝试下一个任务 skip_count += 1 if processed_count > 0: logger.info(f"本次共下发 {processed_count} 条任务") elif skip_count >= max_skip: logger.warning(f"跳过了 {skip_count} 个任务,未找到可处理的任务") @staticmethod def process_current_task(task_id): """发送指定任务""" try: task = ContainerWCSModel.objects.get(taskid=task_id) allocator = LocationAllocation() perform_initial_allocation(allocator, task.current_location) OutboundService.send_task_to_wcs(task) except Exception as e: logger.error(f"任务处理失败: {str(e)}") class DispatchConfigView(APIView): """ 获取/更新任务下发调度配置 GET: 返回当前启用的配置 PUT: 更新配置(cross_floor_concurrent_limit, intra_floor_order, enabled) """ def get(self, request): cfg = DispatchConfig.get_active_config() return DRFResponse(DispatchConfigSerializer(cfg).data, status=200) def put(self, request): cfg = DispatchConfig.get_active_config() serializer = DispatchConfigSerializer(cfg, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() return DRFResponse(serializer.data, status=200) class WCSTaskLogViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) """ pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['-id', "-send_time", "send_time", ] filter_class = WCSTaskLogFilter def get_queryset(self): if self.request.user: return WCSTaskLogModel.objects.all() else: return WCSTaskLogModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'retrieve']: return WCSTaskLogSerializer else: return self.http_method_not_allowed(request=self.request) def perform_initial_allocation(allocator, location): """执行初始库位分配操作""" location_row = location.split('-')[1] location_col = location.split('-')[2] location_layer = location.split('-')[3] location_obj = LocationModel.objects.filter(row=location_row, col=location_col, layer=location_layer).first() if not location_obj: logger.error(f"未找到库位: {location}") return False location_code = location_obj.location_code operations = [ (allocator.update_location_status, location_code, 'reserved'), (allocator.update_location_group_status, location_code) ] for func, *args in operations: if not func(*args): logger.error(f"分配操作失败: {func.__name__}") return False return True # 出库任务下发 class OutTaskViewSet(ViewSet): """ # fun:get_out_task:下发出库任务 # fun:get_batch_count_by_boundlist:获取出库申请下的批次数量 # fun:generate_location_by_demand:根据出库需求生成出库任务 """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 def post(self, request): try: data = self.request.data logger.info(f"收到 出库 推送数据: {data}") # 从请求中获取 bound_list_id bound_list_id = data.get('bound_list_id') # 记录操作日志 try: log_operation( request=request, operation_content=f"接收出库任务请求,出库申请ID: {bound_list_id}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass current_WCS = ContainerWCSModel.objects.filter(tasktype='outbound',bound_list_id = bound_list_id,is_delete=False).first() if current_WCS: logger.info(f"当前{bound_list_id}已有出库任务{current_WCS.taskid}") if current_WCS.working == 1: OutboundService.process_current_task(current_WCS.taskid) # 记录成功日志 try: log_success_operation( request=request, operation_content=f"重新下发出库任务成功,出库申请ID: {bound_list_id},任务ID: {current_WCS.taskid}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": f"下发任务{ current_WCS.taskid }到WCS成功"}, status=200) else : # 记录任务处理中日志 try: log_operation( request=request, operation_content=f"出库任务正在处理中,出库申请ID: {bound_list_id},任务ID: {current_WCS.taskid}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": f"当前任务{current_WCS.taskid}正在处理中"}, status=200) # 获取关联的出库批次 out_batches = OutBatchModel.objects.filter( bound_list_id=bound_list_id, is_delete=False ).select_related('batch_number') if not out_batches.exists(): # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"出库任务创建失败,出库申请ID: {bound_list_id},未找到相关出库批次", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response({"code": 404, "msg": "未找到相关出库批次"}, status=404) # 构建批次需求字典 batch_demand = {} for ob in out_batches: if ob.batch_number_id not in batch_demand: batch_demand[ob.batch_number_id] = { 'required': ob.goods_out_qty, 'allocated': ob.goods_qty, 'remaining': ob.goods_qty - ob.goods_out_qty } else: batch_demand[ob.batch_number_id]['required'] += ob.goods_out_qty batch_demand[ob.batch_number_id]['allocated'] += ob.goods_qty batch_demand[ob.batch_number_id]['remaining'] += (ob.goods_out_qty - ob.goods_qty) # 生成出库任务 generate_result = self.generate_location_by_demand( batch_demand=batch_demand, bound_list_id=bound_list_id ) if generate_result['code'] != '200': # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"出库任务生成失败,出库申请ID: {bound_list_id},错误: {generate_result.get('msg', '未知错误')}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response(generate_result, status=500) # 创建并处理出库任务 container_list = generate_result['data'] # 2. 生成初始任务 creation_result = OutboundService.create_initial_tasks(container_list,bound_list_id) if not creation_result.get("success"): return Response( {"code": 400, "msg": creation_result.get("msg", "创建任务失败")}, status=400 ) # 3. 根据楼层信息初始化下发 initial_layers = creation_result.get("layers", []) if creation_result.get("task_count", 0) > 0: if len(initial_layers) > 1: OutboundService.process_next_task(initial_layers=initial_layers) else: OutboundService.process_next_task() # 记录成功日志 try: container_count = len(container_list) if container_list else 0 log_success_operation( request=request, operation_content=f"出库任务创建成功,出库申请ID: {bound_list_id},托盘数量: {container_count}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": "下发任务成功"}, status=200) except Exception as e: logger.error(f"任务生成失败: {str(e)}") # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"出库任务生成异常,出库申请ID: {bound_list_id if 'bound_list_id' in locals() else '未知'},错误: {str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": str(e)}, status=200) def post_check(self, request): try: data = self.request.data logger.info(f"收到 出库抽检 推送数据: {data}") # 从请求中获取 batch_id batch_id = data.get('batch_id') container_demand = int(data.get('container_demand')) # 记录操作日志 try: log_operation( request=request, operation_content=f"接收出库抽检任务请求,批次ID: {batch_id},抽检托盘数量: {container_demand}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass if not batch_id or not container_demand: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"出库抽检任务创建失败,缺少抽检数目或批次号", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass return Response({"code": "400", "msg": "缺少抽检数目或批次号"}, status=200) current_WCS = ContainerWCSModel.objects.filter(batch=batch_id,tasktype='check',is_delete=False,working=1).first() if current_WCS: logger.info(f"当前{batch_id}已有出库抽检任务{current_WCS.taskid}") if current_WCS.working == 1: OutboundService.process_current_task(current_WCS.taskid) # 记录成功日志 try: log_success_operation( request=request, operation_content=f"重新下发出库抽检任务成功,批次ID: {batch_id},任务ID: {current_WCS.taskid}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": f"下发任务{ current_WCS.taskid }到WCS成功"}, status=200) else : # 记录任务处理中日志 try: log_operation( request=request, operation_content=f"出库抽检任务正在处理中,批次ID: {batch_id},任务ID: {current_WCS.taskid}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": f"当前任务{current_WCS.taskid}正在处理中"}, status=200) # 获取批次号 generate_result = self.generate_location_by_check(batch_id,container_demand) if generate_result['code'] != '200': # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"出库抽检任务生成失败,批次ID: {batch_id},错误: {generate_result.get('msg', '未知错误')}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass return Response(generate_result, status=200) # 创建并处理出库任务 container_list = generate_result['data'] # 3. 立即发送第一个任务 OutboundService.create_initial_check_tasks(container_list,batch_id) OutboundService.process_next_task() # 记录成功日志 try: container_count = len(container_list) if container_list else 0 log_success_operation( request=request, operation_content=f"出库抽检任务创建成功,批次ID: {batch_id},托盘数量: {container_count}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": "下发任务成功"}, status=200) except Exception as e: logger.error(f"任务生成失败: {str(e)}") # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"出库抽检任务生成异常,批次ID: {batch_id if 'batch_id' in locals() else '未知'},错误: {str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "WCS系统", module_name="WCS出库抽检管理" ) except Exception as log_error: pass return Response({"code": 200, "msg": str(e)}, status=200) def get_batch_count_by_boundlist(self,bound_list_id): try: bound_list_obj_all = OutBoundDetailModel.objects.filter(bound_list=bound_list_id).all() if bound_list_obj_all: batch_count_dict = {} # 统计批次数量(创建哈希表,去重) for batch in bound_list_obj_all: if batch.bound_batch_number_id not in batch_count_dict: batch_count_dict[batch.bound_batch_number_id] = batch.bound_batch.goods_out_qty else: batch_count_dict[batch.bound_batch_number_id] += batch.bound_batch.goods_out_qty return batch_count_dict else: logger.error(f"查询批次数量失败: {bound_list_id} 不存在") return {} except Exception as e: logger.error(f"查询批次数量失败: {str(e)}") return {} def get_location_by_status_and_batch(self,status,bound_id): try: container_obj = ContainerDetailModel.objects.filter(batch=bound_id,status=status,is_delete=False).all() if container_obj: container_dict = {} # 统计托盘数量(创建哈希表,去重) for obj in container_obj: if obj.container_id not in container_dict: container_dict[obj.container_id] = obj.goods_qty else: container_dict[obj.container_id] += obj.goods_qty return container_dict else: logger.error(f"查询{status}状态的批次数量失败: {bound_id} 不存在") return {} except Exception as e: logger.error(f"查询{status}状态的批次数量失败: {str(e)}") return {} def get_order_by_batch(self,container_list,bound_id): try: container_dict = {} for container in container_list: location_container = LocationContainerLink.objects.filter(container_id=container,is_active=True).first() if location_container: location_c_number = location_container.location.c_number if container not in container_dict: container_dict[container] = { "container_number":container, "location_c_number":location_c_number, "location_id ":location_container.location.id, "location_type":location_container.location.location_type, "batch_id":bound_id, } if len(container_dict.keys()) == len(container_list): return container_dict else: logger.error(f"查询批次数量失败: {container_list} 不存在") return {} except Exception as e: logger.error(f"查询批次数量失败: {str(e)}") return {} except Exception as e: logger.error(f"查询{status}状态的批次数量失败: {str(e)}") return {} def get_container_allocation(self, batch_id): """兼容所有数据库的去重方案""" # 获取唯一托盘ID列表 container_ids = ( ContainerDetailModel.objects .filter(batch_id=batch_id, status=2,is_delete=False) .values_list('container_id', flat=True) .distinct() ) # 获取每个托盘的最新明细(按id倒序) return ( ContainerDetailModel.objects .filter(container_id__in=container_ids,batch_id=batch_id, status=2,is_delete=False) .select_related('container') .prefetch_related( Prefetch('container__location_links', queryset=LocationContainerLink.objects.select_related('location'), to_attr='active_location') ) .order_by('container_id', '-id') ) def generate_location_by_check(self,batch_id,container_demand): ''' 根据抽检托盘数目,把相应的库位给到出库任务 ''' try: return_data = [] # 获取已去重的托盘列表 container_qs = self.get_container_allocation(batch_id) # 构建托盘信息字典(自动去重) container_map = {} for cd in container_qs: if cd.container_id in container_map: continue # 获取有效库位信息 active_location = next( (link.location for link in cd.container.active_location if link.is_active), None ) container_map[cd.container_id] = { 'detail': cd, 'container': cd.container, 'location': active_location } # 转换为排序列表 container_list = list(container_map.values()) sorted_containers = sorted( container_list, key=lambda x: ( self._get_goods_class_priority(x['detail'].goods_class), -(x['location'].c_number if x['location'] else 0), # -(x['location'].layer if x['location'] else 0), # x['location'].row if x['location'] else 0, # x['location'].col if x['location'] else 0 ) ) for item in sorted_containers: if container_demand <= 0: break # 获取可分配数量 # 记录分配信息 allocate_container = { "container_number": item['container'].id, "batch_id": batch_id, "location_code": item['location'].location_code if item['location'] else 'N/A', "allocate_qty": 0, "c_number": item['location'].c_number if item['location'] else 0 } return_data.append(allocate_container) container_demand -= 1 # 降重 return_data,以container_number为key return_data = list({v['container_number']: v for v in return_data}.values()) # 排序 return_data = sorted(return_data, key=lambda x: -x['c_number']) return {"code": "200", "msg": "Success", "data": return_data} except Exception as e: logger.error(f"出库任务生成失败: {str(e)}", exc_info=True) return {"code": "500", "msg": str(e)} def generate_location_by_demand(self, batch_demand, bound_list_id): try: return_data = [] for batch_id, demand in batch_demand.items(): # 获取已去重的托盘列表 container_qs = self.get_container_allocation(batch_id) # 构建托盘信息字典(自动去重) container_map = {} for cd in container_qs: if cd.container_id in container_map: container_map[cd.container_id]['goods_qty'] += cd.goods_qty - cd.goods_out_qty continue # 获取有效库位信息 active_location = next( (link.location for link in cd.container.active_location if link.is_active), None ) container_map[cd.container_id] = { 'detail': cd, 'goods_qty': cd.goods_qty - cd.goods_out_qty, 'container': cd.container, 'location': active_location } # 转换为排序列表 container_list = list(container_map.values()) # 多维度排序(优化性能版) sorted_containers = sorted( container_list, key=lambda x: ( self._get_goods_class_priority(x['detail'].goods_class), -(x['location'].c_number if x['location'] else 0), # -(x['location'].layer if x['location'] else 0), # x['location'].row if x['location'] else 0, # x['location'].col if x['location'] else 0 ) ) # 分配逻辑 required = demand['required'] for item in sorted_containers: if required <= 0: break # 获取可分配数量 allocatable = item['goods_qty'] allocate_qty = min(required, allocatable) # 记录分配信息 allocate_container = { "container_number": item['container'].id, "batch_id": batch_id, "location_code": item['location'].location_code if item['location'] else 'N/A', "allocate_qty": allocate_qty, "c_number": item['location'].c_number if item['location'] else 0 } return_data.append(allocate_container) required -= allocate_qty # 更新数据库状态(需要事务处理) self._update_allocation_status(allocate_container, allocate_qty,bound_list_id) # 降重 return_data,以container_number为key return_data = list({v['container_number']: v for v in return_data}.values()) # 排序 return_data = sorted(return_data, key=lambda x: -x['c_number']) return {"code": "200", "msg": "Success", "data": return_data} except Exception as e: logger.error(f"出库任务生成失败: {str(e)}", exc_info=True) return {"code": "500", "msg": str(e)} def _get_goods_class_priority(self, goods_class): """货物类型优先级权重""" return { 3: 0, # 散盘最高 1: 1, # 成品次之 2: 2 # 空盘最低 }.get(goods_class, 99) def _update_allocation_status(self, allocate_container, allocate_qty,bound_list_id): """事务化更新分配状态""" try: # 更新托盘明细 container_detail_all = ContainerDetailModel.objects.filter( container_id=allocate_container['container_number'], batch_id=allocate_container['batch_id'], is_delete=False ).all() left_qty = 0 for cd in container_detail_all: if left_qty - allocate_qty >= 0: break add_qty = min(allocate_qty-left_qty, cd.goods_qty - cd.goods_out_qty) if add_qty == 0: continue left_qty += add_qty last_out_qty = cd.goods_out_qty cd.goods_out_qty += add_qty # print(f"{left_qty/25} 更新托盘 {cd.container.container_code} 批次 {cd.batch_id} 出库数量: {cd.goods_out_qty}") cd.save() out_batch_detail.objects.create( out_bound_id=bound_list_id, container_id=cd.container_id, container_detail_id=cd.id, out_goods_qty=add_qty, last_out_goods_qty = last_out_qty, working = 1, is_delete = False ) return True except Exception as e: logger.error(f"状态更新失败: {str(e)}") return False def create_or_update_container_operation(self,container_obj,batch_id,bound_id,to_location,goods_qty,goods_weight): try: container_operation_obj = ContainerOperationModel.objects.filter(container=container_obj,batch_id=batch_id,bound_id=bound_id,operation_type="outbound").first() if container_operation_obj: logger.info(f"[0]查询出库任务: {container_operation_obj.operation_type} ") logger.info(f"更新出库任务: {container_obj.container_code} 批次 {batch_id} 出库需求: {bound_id} 数量: {goods_qty} 重量: {goods_weight}") container_operation_obj.to_location = to_location container_operation_obj.goods_qty = goods_qty container_operation_obj.goods_weight = goods_weight container_operation_obj.save() # 记录更新日志(内部方法,不记录详细日志,避免过多记录) else: logger.info(f"创建出库任务: {container_obj.container_code} 批次 {batch_id} 出库需求: {bound_id} 数量: {goods_qty} 重量: {goods_weight}") batch = BoundBatchModel.objects.filter(id=batch_id).first() if not batch: return {"code": "500", "msg": f"批次 {batch_id} 不存在"} ContainerOperationModel.objects.create( month = int(timezone.now().strftime("%Y%m")), container = container_obj, goods_code = batch.goods_code, goods_desc = batch.goods_desc, operation_type ="outbound", batch_id = batch_id, bound_id = bound_id, goods_qty = goods_qty, goods_weight = goods_weight, from_location = container_obj.current_location, to_location= to_location, timestamp=timezone.now(), operator="WMS", memo=f"出库需求: {bound_id}, 批次: {batch_id}, 数量: {goods_qty}" ) # 记录创建日志(内部方法,不记录详细日志,避免过多记录) return {"code": "200", "msg": "Success"} except Exception as e: return {"code": "500", "msg": str(e)} def update_container_detail_out_qty(self,container_obj,batch_id): try: logger.info(f"[1]更新托盘出库数量: {container_obj.container_code} 批次 {batch_id} ") container_operation_obj = ContainerOperationModel.objects.filter(container=container_obj,batch_id=batch_id,operation_type="outbound").all() if not container_operation_obj: logger.error(f"[1]批次 {batch_id} 托盘 {container_obj.container_code} 无出库任务") return {"code": "500", "msg": f"批次 {batch_id} 托盘 {container_obj.container_code} 无出库任务"} container_detail_obj = ContainerDetailModel.objects.filter(container=container_obj,batch_id=batch_id,status=2,is_delete=False).first() if not container_detail_obj: logger.error(f"[1]批次 {batch_id} 托盘 {container_obj.container_code} 无批次信息") return {"code": "500", "msg": f"批次 {batch_id} 托盘 {container_obj.container_code} 无批次信息"} out_qty = 0 for obj in container_operation_obj: out_qty += obj.goods_qty if out_qty >= container_detail_obj.goods_qty: out_qty = container_detail_obj.goods_qty container_detail_obj.status = 3 break if out_qty == 0: logger.error(f"[1]批次 {batch_id} 托盘 {container_obj.container_code} 无出库数量") return {"code": "500", "msg": f"批次 {batch_id} 托盘 {container_obj.container_code} 无出库数量"} container_detail_obj.goods_out_qty = out_qty container_detail_obj.save() return {"code": "200", "msg": "Success"} except Exception as e: return {"code": "500", "msg": str(e)} # 出库任务监测 class BatchViewSet(viewsets.ModelViewSet): authentication_classes = [] # 禁用所有认证类 permission_classes = [AllowAny] # 允许任意访问 def wcs_post(self, request, *args, **kwargs): data = self.request.data logger.info(f"收到 WMS 推送数据: {data}") return Response({"code": "200", "msg": "Success"}, status=200) # views.py class OutDetailViewSet(viewsets.ModelViewSet): pagination_class = MyPageNumberPagination serializer_class = OutBoundDetailSerializer def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): """根据不同的action调整查询集""" if self.action == 'list': # 获取每个out_bound的最新一条记录 # 子查询,用于获取每个out_bound对应的最新out_batch_detail记录 subquery = out_batch_detail.objects.filter( out_bound=OuterRef('out_bound') ).order_by('-id') # 返回最新的out_batch_detail记录,通过子查询的结果进行过滤 return out_batch_detail.objects.filter( id=Subquery(subquery.values('id')[:1]) ) return out_batch_detail.objects.all() def retrieve(self, request, *args, **kwargs): """重写retrieve方法返回关联集合""" qs = self.get_project() queryset = self.filter_queryset( out_batch_detail.objects.filter(out_bound = qs) ) # 分页处理 # page = self.paginate_queryset(queryset) # if queryset is not None: # serializer = self.get_serializer(queryset, many=True) # return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) def get_serializer_class(self): """根据action切换序列化器""" if self.action == 'retrieve': return OutBoundFullDetailSerializer return super().get_serializer_class() def get_out_batch_detail(self, request): """获取某个托盘的出库明细""" try: container_code = request.query_params.get('container_code') container_obj = ContainerListModel.objects.filter(container_code=container_code).first() if not container_obj: return Response({"code": "500", "message": f"托盘 {container_code} 不存在"}, status=status.HTTP_200_OK) out_batch_detail_all = out_batch_detail.objects.filter(container=container_obj,working=1,is_delete=False).all() if not out_batch_detail_all: return Response({"code": "500", "message": f"托盘 {container_code} 无出库明细"}, status=status.HTTP_200_OK) serializer = OutBoundFullDetailSerializer(out_batch_detail_all, many=True) return Response({"code": "200", "message": "Success", "data": serializer.data}, status=status.HTTP_200_OK) except Exception as e: return Response({"code": "500", "message": str(e)}, status=status.HTTP_200_OK) def confirm_out_batch_detail(self, request): """确认出库 - 支持按批次和数量部分确认""" try: container_code = request.data.get('container_code') confirm_items = request.data.get('confirm_items', []) # 格式: [{"detail_id": 1, "confirm_qty": 10}, ...] # 记录操作日志 try: log_operation( request=request, operation_content=f"接收确认出库请求,托盘编码: {container_code},确认项数量: {len(confirm_items) if confirm_items else '全部'}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass if not container_code: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"确认出库失败,缺少托盘编码参数", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass return Response({"code": "500", "message": "缺少托盘编码参数"}, status=status.HTTP_200_OK) container_obj = ContainerListModel.objects.filter(container_code=container_code).first() if not container_obj: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"确认出库失败,托盘 {container_code} 不存在", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass return Response({"code": "500", "message": f"托盘 {container_code} 不存在"}, status=status.HTTP_200_OK) # 如果没有指定确认项,则确认所有出库明细(保持向后兼容) if not confirm_items: out_batch_detail_all = out_batch_detail.objects.filter(container=container_obj,working=1,is_delete=False).order_by('-id').all() if not out_batch_detail_all: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"确认出库失败,托盘 {container_code} 无出库明细", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass return Response({"code": "500", "message": f"托盘 {container_code} 无出库明细"}, status=status.HTTP_200_OK) total_qty = 0 batch_list = [] for obj in out_batch_detail_all: obj.working = 0 obj.save() total_qty += obj.out_goods_qty batch_list.append(f"批次{obj.container_detail.batch.id}(数量:{obj.out_goods_qty})") BatchOperateLogModel.objects.create( batch_id = obj.container_detail.batch, log_type = 1, log_date = timezone.now(), goods_code = obj.container_detail.batch.goods_code, goods_desc = obj.container_detail.batch.goods_desc, goods_qty = obj.out_goods_qty, log_content = f"出库托盘 {container_code} 批次 {obj.container_detail.batch.id} 数量 {obj.out_goods_qty}", creater = "WMS", openid = "WMS" ) # 记录成功日志 try: log_success_operation( request=request, operation_content=f"确认出库成功,托盘编码: {container_code},确认明细数: {len(out_batch_detail_all)},总数量: {total_qty},批次: {', '.join(batch_list)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass return Response({"code": "200", "message": "出库成功"}, status=status.HTTP_200_OK) # 按批次和数量部分确认 from decimal import Decimal for item in confirm_items: out_detail_id = item.get('detail_id') # 这是 out_batch_detail 的 ID confirm_qty = Decimal(str(item.get('confirm_qty', 0))) if not out_detail_id or confirm_qty <= 0: continue # 确保 out_detail_id 是整数类型 try: out_detail_id = int(out_detail_id) except (ValueError, TypeError): logger.error(f"无效的 out_detail_id: {out_detail_id}") continue # 添加调试日志 logger.info(f"查询出库明细: container={container_obj.container_code}, out_batch_detail_id={out_detail_id}") # 直接通过 out_batch_detail 的 ID 查找记录 out_detail = out_batch_detail.objects.filter( id=out_detail_id, container_id=container_obj.id, working=1, is_delete=False ).first() if not out_detail: # 尝试不限制 working 状态查找 out_detail_any = out_batch_detail.objects.filter( id=out_detail_id, container_id=container_obj.id, is_delete=False ).first() if out_detail_any: if out_detail_any.working == 0: logger.warning(f"出库明细 {out_detail_id} 的 working 状态为 0,可能已经被处理过了") else: logger.warning(f"出库明细 {out_detail_id} 的 working 状态为 {out_detail_any.working},不是 1") else: logger.error(f"未找到出库明细: out_batch_detail_id={out_detail_id}, container={container_obj.container_code}") continue # 计算本次要确认的数量(不能超过该出库明细的数量) out_qty_to_confirm = min(confirm_qty, out_detail.out_goods_qty) # 如果确认数量等于或大于出库数量,则标记为已完成 if out_qty_to_confirm >= out_detail.out_goods_qty: out_detail.working = 0 out_detail.save() else: # 部分确认,创建新的确认记录并减少剩余数量 # 创建一个新的 out_batch_detail 记录用于剩余数量 remaining_qty = out_detail.out_goods_qty - out_qty_to_confirm out_detail.out_goods_qty = out_qty_to_confirm out_detail.working = 0 out_detail.save() # 创建剩余数量的记录(如果需要) if remaining_qty > 0: out_batch_detail.objects.create( out_bound_id=out_detail.out_bound_id, container_id=out_detail.container_id, container_detail_id=out_detail.container_detail_id, out_goods_qty=remaining_qty, last_out_goods_qty=out_detail.last_out_goods_qty, working=1, is_delete=False ) # 创建操作日志 BatchOperateLogModel.objects.create( batch_id = out_detail.container_detail.batch, log_type = 1, log_date = timezone.now(), goods_code = out_detail.container_detail.batch.goods_code, goods_desc = out_detail.container_detail.batch.goods_desc, goods_qty = out_qty_to_confirm, log_content = f"出库托盘 {container_code} 批次 {out_detail.container_detail.batch.id} 数量 {out_qty_to_confirm}", creater = "WMS", openid = "WMS" ) logger.info(f"确认出库明细 {out_detail_id}: 确认数量={out_qty_to_confirm}, 剩余数量={out_detail.out_goods_qty}") # 记录成功日志 try: confirmed_count = len(confirm_items) total_confirmed_qty = sum(Decimal(str(item.get('confirm_qty', 0))) for item in confirm_items) log_success_operation( request=request, operation_content=f"确认出库成功,托盘编码: {container_code},确认明细数: {confirmed_count},总确认数量: {total_confirmed_qty}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass return Response({"code": "200", "message": "出库成功"}, status=status.HTTP_200_OK) except Exception as e: logger.error(f"确认出库失败: {str(e)}", exc_info=True) # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"确认出库异常,托盘编码: {container_code if 'container_code' in locals() else '未知'},错误: {str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库确认管理" ) except Exception as log_error: pass return Response({"code": "500", "message": str(e)}, status=status.HTTP_200_OK) def cancel_out_batch_detail(self, request): """取消出库 - 支持按批次和数量部分取消""" try: container_code = request.data.get('container_code') cancel_items = request.data.get('cancel_items', []) # 格式: [{"detail_id": 1, "cancel_qty": 10}, ...] # 记录操作日志 try: log_operation( request=request, operation_content=f"接收取消出库请求,托盘编码: {container_code},取消项数量: {len(cancel_items) if cancel_items else '全部'}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass if not container_code: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"取消出库失败,缺少托盘编码参数", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass return Response({"code": "500", "message": "缺少托盘编码参数"}, status=status.HTTP_200_OK) container_obj = ContainerListModel.objects.filter(container_code=container_code).first() if not container_obj: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"取消出库失败,托盘 {container_code} 不存在", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass return Response({"code": "500", "message": f"托盘 {container_code} 不存在"}, status=status.HTTP_200_OK) # 如果没有指定取消项,则取消所有出库明细(保持向后兼容) if not cancel_items: out_batch_detail_all = out_batch_detail.objects.filter(container=container_obj,working=1,is_delete=False).order_by('-id').all() if not out_batch_detail_all: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"取消出库失败,托盘 {container_code} 无出库明细", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass return Response({"code": "500", "message": f"托盘 {container_code} 无出库明细"}, status=status.HTTP_200_OK) total_cancel_qty = 0 batch_list = [] for obj in out_batch_detail_all: total_cancel_qty += obj.out_goods_qty batch_list.append(f"批次{obj.container_detail.batch.id}(数量:{obj.out_goods_qty})") obj.container_detail.goods_out_qty = obj.last_out_goods_qty obj.container_detail.save() obj.is_delete = True obj.working = 0 obj.save() # 记录成功日志 try: log_success_operation( request=request, operation_content=f"取消出库成功,托盘编码: {container_code},取消明细数: {len(out_batch_detail_all)},总取消数量: {total_cancel_qty},批次: {', '.join(batch_list)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass return Response({"code": "200", "message": "出库取消成功"}, status=status.HTTP_200_OK) # 按批次和数量部分取消 from decimal import Decimal for item in cancel_items: out_detail_id = item.get('detail_id') # 这是 out_batch_detail 的 ID cancel_qty = Decimal(str(item.get('cancel_qty', 0))) if not out_detail_id or cancel_qty <= 0: continue # 确保 out_detail_id 是整数类型 try: out_detail_id = int(out_detail_id) except (ValueError, TypeError): logger.error(f"无效的 out_detail_id: {out_detail_id}") continue # 添加调试日志 logger.info(f"查询出库明细: container={container_obj.container_code}, out_batch_detail_id={out_detail_id}") # 直接通过 out_batch_detail 的 ID 查找记录 out_detail = out_batch_detail.objects.filter( id=out_detail_id, container_id=container_obj.id, working=1, is_delete=False ).first() if not out_detail: # 尝试不限制 working 状态查找 out_detail_any = out_batch_detail.objects.filter( id=out_detail_id, container_id=container_obj.id, is_delete=False ).first() if out_detail_any: if out_detail_any.working == 0: logger.warning(f"出库明细 {out_detail_id} 的 working 状态为 0,可能已经被处理过了") else: logger.warning(f"出库明细 {out_detail_id} 的 working 状态为 {out_detail_any.working},不是 1") else: logger.error(f"未找到出库明细: out_batch_detail_id={out_detail_id}, container={container_obj.container_code}") continue # 计算本次要取消的数量(不能超过该出库明细的数量) out_qty_to_cancel = min(cancel_qty, out_detail.out_goods_qty) # 更新容器明细的出库数量 out_detail.container_detail.goods_out_qty -= out_qty_to_cancel out_detail.container_detail.goods_out_qty = max( out_detail.container_detail.goods_out_qty, out_detail.last_out_goods_qty ) out_detail.container_detail.save() # 如果全部取消,则删除或标记该出库明细 if out_detail.out_goods_qty <= out_qty_to_cancel: out_detail.is_delete = True out_detail.working = 0 out_detail.out_goods_qty = Decimal('0') # 清零 else: # 部分取消,减少出库数量 out_detail.out_goods_qty -= out_qty_to_cancel out_detail.save() logger.info(f"取消出库明细 {out_detail_id}: 取消数量={out_qty_to_cancel}, 剩余数量={out_detail.out_goods_qty}") # 记录成功日志 try: canceled_count = len(cancel_items) total_canceled_qty = sum(Decimal(str(item.get('cancel_qty', 0))) for item in cancel_items) log_success_operation( request=request, operation_content=f"取消出库成功,托盘编码: {container_code},取消明细数: {canceled_count},总取消数量: {total_canceled_qty}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass return Response({"code": "200", "message": "出库取消成功"}, status=status.HTTP_200_OK) except Exception as e: logger.error(f"取消出库失败: {str(e)}", exc_info=True) # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"取消出库异常,托盘编码: {container_code if 'container_code' in locals() else '未知'},错误: {str(e)}", operation_level="other", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库取消管理" ) except Exception as log_error: pass return Response({"code": "500", "message": str(e)}, status=status.HTTP_200_OK) def get_contianer_detail(self, request): try: container_code = request.query_params.get('container_code') # 记录操作日志(查询操作) try: log_operation( request=request, operation_content=f"查询托盘明细,托盘编码: {container_code}", operation_level="view", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库明细管理" ) except Exception as log_error: pass container_obj = ContainerListModel.objects.filter(container_code=container_code).first() if not container_obj: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"查询托盘明细失败,托盘 {container_code} 不存在", operation_level="view", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库明细管理" ) except Exception as log_error: pass return Response({"code": "500", "message": f"托盘 {container_code} 不存在"}, status=status.HTTP_200_OK) container_detail_all = ContainerDetailModel.objects.filter(container=container_obj,is_delete=False).all().exclude(status__in=[3]) return_data=[] for obj in container_detail_all: return_data.append({ "id": obj.id, "batch": obj.batch.bound_number, "goods_code": obj.goods_code, "goods_desc": obj.goods_desc, "goods_qty" :obj.goods_qty, "out_goods_qty": obj.goods_out_qty }) # 记录成功日志(查询成功) try: log_success_operation( request=request, operation_content=f"查询托盘明细成功,托盘编码: {container_code},明细数量: {len(return_data)}", operation_level="view", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库明细管理" ) except Exception as log_error: pass return Response({"code": "200", "message": "Success", "data": return_data}, status=status.HTTP_200_OK) except Exception as e: # 记录异常日志 try: log_failure_operation( request=request, operation_content=f"查询托盘明细异常,托盘编码: {container_code if 'container_code' in locals() else '未知'},错误: {str(e)}", operation_level="view", operator=request.auth.name if hasattr(request, 'auth') and request.auth else "系统", module_name="出库明细管理" ) except Exception as log_error: pass return Response({"code": "500", "message": str(e)}, status=status.HTTP_200_OK) def change_container_out_qty(self, request): try: container_code = request.data.get('container_code') container_obj = ContainerListModel.objects.filter(container_code=container_code).first() if not container_obj: return Response({"code": "500", "message": f"托盘 {container_code} 不存在"}, status=status.HTTP_200_OK) logger.info(f"change_container_out_qty: {request.data}") for container_detail_id, out_qty in request.data.get('detail_list').items(): container_detail_obj = ContainerDetailModel.objects.filter(id=container_detail_id,is_delete=False).first() if not container_detail_obj: continue from decimal import Decimal out_qty = Decimal(out_qty) container_detail_obj.goods_out_qty += out_qty container_detail_obj.save() return Response({"code": "200", "message": "Success"}, status=status.HTTP_200_OK) except Exception as e: return Response({"code": "500", "message": str(e)}, status=status.HTTP_200_OK)