from wsgiref import headers from rest_framework.views import APIView from rest_framework import viewsets from utils.page import MyPageNumberPagination from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend from rest_framework.response import Response from django.utils import timezone import requests from django.db import transaction import logging from rest_framework import status from .models import ContainerListModel,ContainerDetailModel,ContainerOperationModel,ContainerWCSModel,TaskModel from bound.models import BoundBatchModel,BoundDetailModel,BoundListModel,OutBoundDetailModel from bin.views import LocationAllocation,base_location from bin.models import LocationModel,LocationContainerLink,LocationGroupModel from bound.models import BoundBatchModel from .serializers import ContainerDetailGetSerializer,ContainerDetailPostSerializer from .serializers import ContainerListGetSerializer,ContainerListPostSerializer from .serializers import ContainerOperationGetSerializer,ContainerOperationPostSerializer from .serializers import TaskGetSerializer,TaskPostSerializer from .serializers import WCSTaskGetSerializer from .filter import ContainerDetailFilter,ContainerListFilter,ContainerOperationFilter,TaskFilter,WCSTaskFilter from rest_framework.permissions import AllowAny import threading from django.db import close_old_connections from bin.services import AllocationService logger = logging.getLogger(__name__) class ContainerListViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = ContainerListFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerListModel.objects.filter() else: return ContainerListModel.objects.filter(id=id) else: return ContainerListModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerListGetSerializer elif self.action in ['create', 'update']: return ContainerListPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): # 创建托盘:托盘码五位数字(唯一),当前库位,目标库位,状态,最后操作时间 container_all = ContainerListModel.objects.all().order_by('container_code') if container_all.count() == 0: container_code = 12345 else: container_code = container_all.last().container_code + 1 container_obj = ContainerListModel.objects.create( container_code=container_code, current_location='N/A', target_location='N/A', status=0, last_operation=timezone.now() ) serializer = ContainerListGetSerializer(container_obj) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=201, headers=headers) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) class WCSTaskViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['-id', "-create_time", "update_time", ] filter_class = WCSTaskFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerWCSModel.objects.filter() else: return ContainerWCSModel.objects.filter(id=id) else: return ContainerWCSModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return WCSTaskGetSerializer else: return self.http_method_not_allowed(request=self.request) class TaskViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = TaskFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return TaskModel.objects.filter() else: return TaskModel.objects.filter(id=id) else: return TaskModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return TaskGetSerializer elif self.action in ['create', 'update']: return TaskPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data return Response(data, status=200, headers=headers) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) class TaskRollbackMixin: @transaction.atomic def rollback_task(self, request, task_id, *args, **kwargs): """ 撤销入库任务并回滚相关状态 """ try: # 获取任务实例并锁定数据库记录 task = ContainerWCSModel.objects.select_for_update().get(taskid=task_id) container_code = task.container target_location = task.target_location batch = task.batch # 初始化库位分配器 allocator = LocationAllocation() # ==================== 库位状态回滚 ==================== # 解析目标库位信息(格式:仓库代码-行-列-层) try: warehouse_code, row, col, layer = target_location.split('-') location = LocationModel.objects.get( warehouse_code=warehouse_code, row=int(row), col=int(col), layer=int(layer) ) # 回滚库位状态到可用状态 allocator.update_location_status(location.location_code, 'available') # 更新库位组状态(需要根据实际逻辑实现) allocator.update_location_group_status(location.location_code) # 解除库位与托盘的关联 allocator.update_location_container_link(location.location_code, None) # 清除库位组的批次关联 allocator.update_location_group_batch(location, None) except (ValueError, LocationModel.DoesNotExist) as e: logger.error(f"库位解析失败: {str(e)}") raise Exception("关联库位信息无效") # ==================== 批次状态回滚 ==================== if batch: # 将批次状态恢复为未处理状态(假设原状态为1) allocator.update_batch_status(batch.bound_number, '1') # ==================== 容器状态回滚 ==================== container_obj = ContainerListModel.objects.get(container_code=container_code) # 恢复容器详细状态为初始状态(假设原状态为1) allocator.update_container_detail_status(container_code, 1) # 恢复容器的目标位置为当前所在位置 container_obj.target_location = task.current_location container_obj.save() # ==================== 删除任务记录 ==================== task.delete() # ==================== 其他关联清理 ==================== # 如果有其他关联数据(如inport_update_task的操作),在此处添加清理逻辑 return Response( {'code': '200', 'message': '任务回滚成功', 'data': None}, status=status.HTTP_200_OK ) except ContainerWCSModel.DoesNotExist: logger.warning(f"任务不存在: {task_id}") return Response( {'code': '404', 'message': '任务不存在', 'data': None}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: logger.error(f"任务回滚失败: {str(e)}", exc_info=True) return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) class ContainerWCSViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ authentication_classes = [] # 禁用所有认证类 permission_classes = [AllowAny] # 允许任意访问 def get_container_wcs(self, request, *args, **kwargs): data = self.request.data container = data.get('container_number') current_location = data.get('current_location') logger.info(f"请求托盘:{container},请求位置:{current_location}") data_return = {} try: container_obj = ContainerListModel.objects.filter(container_code=container).first() if not container_obj: data_return = { 'code': '400', 'message': '托盘编码不存在', 'data': data } return Response(data_return, status=status.HTTP_400_BAD_REQUEST) # 更新容器数据(部分更新) serializer = ContainerListPostSerializer( container_obj, data=data, partial=True # 允许部分字段更新 ) serializer.is_valid(raise_exception=True) serializer.save() # 检查是否已在目标位置 if current_location == str(container_obj.target_location) and current_location!= '203' and current_location!= '103': logger.info(f"托盘 {container} 已在目标位置") data_return = { 'code': '200', 'message': '当前位置已是目标位置', 'data': data } else: current_task = ContainerWCSModel.objects.filter( container=container, tasktype='inbound', working = 1, ).exclude(status=300).first() if current_task: data_return = { 'code': '200', 'message': '任务已存在,重新下发', 'data': current_task.to_dict() } else: # todo: 这里的入库操作记录里面的记录的数量不对 location_min_value,allocation_target_location, batch_info = AllocationService.allocate(container, current_location) batch_id = batch_info['number'] if batch_info['class'] == 2: self.generate_task_no_batch(container, current_location, allocation_target_location,batch_id,location_min_value.c_number) self.generate_container_operate_no_batch(container_obj, batch_id, allocation_target_location) elif batch_info['class'] == 3: self.generate_task_no_batch(container, current_location, allocation_target_location,batch_id,location_min_value.c_number) self.generate_container_operate_no_batch(container_obj, batch_id, allocation_target_location) else: self.generate_task(container, current_location, allocation_target_location,batch_id,location_min_value.c_number) # 生成任务 self.generate_container_operate(container_obj, batch_id, allocation_target_location) current_task = ContainerWCSModel.objects.get( container=container, tasktype='inbound', working=1, ) data_return = { 'code': '200', 'message': '任务下发成功', 'data': current_task.to_dict() } container_obj.target_location = allocation_target_location container_obj.save() if batch_info['class'] == 1 or batch_info['class'] == 3: self.inport_update_task(current_task.id, container_obj.id) http_status = status.HTTP_200_OK if data_return['code'] == '200' else status.HTTP_400_BAD_REQUEST return Response(data_return, status=http_status) except Exception as e: logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True) return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @transaction.atomic def generate_container_operate(self, container_obj, bound_number,allocation_target_location): batch_obj = BoundBatchModel.objects.filter(bound_number=bound_number).first() ContainerOperationModel.objects.create( month = int(timezone.now().strftime("%Y%m")), container = container_obj, goods_code = batch_obj.goods_code, goods_desc = batch_obj.goods_desc, operation_type ="inbound", batch_id = batch_obj.id, goods_qty = batch_obj.goods_qty, goods_weight = batch_obj.goods_qty, from_location = container_obj.current_location, to_location= allocation_target_location, timestamp=timezone.now(), operator="WMS", memo=f"WCS入库: 批次: {bound_number}, 数量: {batch_obj.goods_qty}" ) @transaction.atomic def generate_container_operate_no_batch(self, container_obj, bound_number,allocation_target_location): ContainerOperationModel.objects.create( month = int(timezone.now().strftime("%Y%m")), container = container_obj, goods_code = 'container', goods_desc = '托盘组', operation_type ="inbound", goods_qty = 1, goods_weight = 0, from_location = container_obj.current_location, to_location= allocation_target_location, timestamp=timezone.now(), memo=f"WCS入库: 批次: {bound_number}, 数量: 1" ) @transaction.atomic def generate_task(self, container, current_location, target_location,batch_id,location_c_number): batch = BoundBatchModel.objects.filter(bound_number=batch_id).first() batch_detail = BoundDetailModel.objects.filter(bound_batch=batch).first() if not batch: logger.error(f"批次号 {batch_id} 不存在") return False data_tosave = { 'container': container, 'batch': batch, 'batch_number': batch_id, 'batch_out': None, 'bound_list': batch_detail.bound_list, 'sequence': 1, 'order_number' :location_c_number, 'priority': 1, 'current_location': current_location, 'month': timezone.now().strftime('%Y%m'), 'target_location': target_location, 'tasktype': 'inbound', 'status': 103, 'is_delete': False } # 生成唯一递增的 taskid last_task = ContainerWCSModel.objects.filter( month=data_tosave['month'], ).order_by('-tasknumber').first() if last_task: number_id = last_task.tasknumber + 1 new_id = f"{number_id:05d}" else: new_id = "00001" number_id = f"{data_tosave['month']}{new_id}" data_tosave['taskid'] = f"inbound-{data_tosave['month']}-{new_id}" logger.info(f"生成入库任务: {data_tosave['taskid']}") # 每月生成唯一递增的 taskNumber data_tosave['tasknumber'] = number_id ContainerWCSModel.objects.create(**data_tosave) def generate_task_no_batch(self, container, current_location, target_location,batch_id,location_c_number): data_tosave = { 'container': container, 'batch': None, 'batch_number': batch_id, 'batch_out': None, 'bound_list': None, 'sequence': 1, 'order_number' :location_c_number, 'priority': 1, 'current_location': current_location, 'month': timezone.now().strftime('%Y%m'), 'target_location': target_location, 'tasktype': 'inbound', 'status': 103, 'is_delete': False } # 生成唯一递增的 taskid last_task = ContainerWCSModel.objects.filter( month=data_tosave['month'], ).order_by('-tasknumber').first() if last_task: number_id = last_task.tasknumber + 1 new_id = f"{number_id:05d}" else: new_id = "00001" number_id = f"{data_tosave['month']}{new_id}" data_tosave['taskid'] = f"inbound-{data_tosave['month']}-{new_id}" logger.info(f"生成入库任务: {data_tosave['taskid']}") # 每月生成唯一递增的 taskNumber data_tosave['tasknumber'] = number_id ContainerWCSModel.objects.create(**data_tosave) def update_container_wcs(self, request, *args, **kwargs): data = self.request.data logger.info(f"请求托盘:{data.get('container_number')}, 请求位置:{data.get('current_location')}, 任务号:{data.get('taskNumber')}") try: # 前置校验 container_obj, error_response = self.validate_container(data) if error_response: return error_response # 更新容器数据 if not self.update_container_data(container_obj, data): return Response( {'code': '400', 'message': '数据更新失败', 'data': data}, status=status.HTTP_400_BAD_REQUEST ) # 处理位置逻辑 task = ContainerWCSModel.objects.filter( container=container_obj.container_code, tasktype='inbound' ).first() if self.is_already_at_target(container_obj, data.get('current_location')): return self.handle_target_reached(container_obj, data) elif task: data_return = { 'code': '200', 'message': '任务已存在,重新下发', 'data': task.to_dict() } return Response(data_return, status=status.HTTP_200_OK) else: return self.handle_new_allocation(container_obj, data) except Exception as e: logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True) return Response({'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # ---------- 辅助函数 ---------- def validate_container(self, data): """验证容器是否存在""" container = data.get('container_number') container_obj = ContainerListModel.objects.filter(container_code=container).first() if not container_obj: return None, Response({ 'code': '400', 'message': '托盘编码不存在', 'data': data }, status=status.HTTP_400_BAD_REQUEST) return container_obj, None def update_container_data(self, container_obj, data): """更新容器数据""" serializer = ContainerListPostSerializer( container_obj, data=data, partial=True ) if serializer.is_valid(): serializer.save() return True return False def is_already_at_target(self, container_obj, current_location): """检查是否已在目标位置""" print (current_location) print (str(container_obj.target_location)) return current_location == str(container_obj.target_location) def handle_target_reached(self, container_obj, data): """处理已到达目标位置的逻辑""" logger.info(f"托盘 {container_obj.container_code} 已在目标位置") task = self.get_task_by_tasknumber(data) self.update_pressure_values(task, container_obj) if task.working == 1: alloca = LocationAllocation() alloca.update_batch_goods_in_location_qty(container_obj.container_code, 1) task = self.process_task_completion(data) if not task: return Response({'code': '400', 'message': '任务不存在', 'data': data}, status=status.HTTP_400_BAD_REQUEST) if task and task.tasktype == 'inbound': self.update_storage_system(container_obj) if task and task.tasktype == 'outbound' and task.status == 300: success = self.handle_outbound_completion(container_obj, task) if not success: return Response({'code': '500', 'message': '出库状态更新失败', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) OutboundService.process_next_task() return Response({ 'code': '200', 'message': '当前位置已是目标位置', 'data': data }, status=status.HTTP_200_OK) def get_task_by_tasknumber(self, data): taskNumber = data.get('taskNumber') + 20000000000 task = ContainerWCSModel.objects.filter(tasknumber=taskNumber).first() if task: return task else: return None def process_task_completion(self, data): """处理任务完成状态""" taskNumber = data.get('taskNumber') + 20000000000 task = ContainerWCSModel.objects.filter(tasknumber=taskNumber).first() if task: task.status = 300 task.message = '任务已完成' task.working = 0 task.save() return task def update_pressure_values(self, task, container_obj): """更新压力值计算""" if task and task.tasktype in ['inbound']: base_location_obj = base_location.objects.get(id=1) layer = int(container_obj.target_location.split('-')[-1]) pressure_field = f"layer{layer}_pressure" logger.info(f"更新压力值,压力字段:{pressure_field}") current_pressure = getattr(base_location_obj, pressure_field, 0) updated_pressure = max(current_pressure - task.working, 0) setattr(base_location_obj, pressure_field, updated_pressure) base_location_obj.save() def update_storage_system(self, container_obj): """更新仓储系统状态""" allocator = LocationAllocation() location_code = self.get_location_code(container_obj.target_location) # 链式更新操作 update_operations = [ (allocator.update_location_status, location_code, 'occupied'), (allocator.update_location_container_link, location_code, container_obj.container_code), (allocator.update_container_detail_status, container_obj.container_code, 2) ] for func, *args in update_operations: if not func(*args): logger.error(f"操作失败: {func.__name__}") return False return True def get_location_code(self, target_location): """从目标位置解析获取位置编码""" parts = target_location.split('-') coordinate = f"{int(parts[1])}-{int(parts[2])}-{int(parts[3])}" return LocationModel.objects.filter(coordinate=coordinate).first().location_code def handle_new_allocation(self, container_obj, data): """处理新库位分配逻辑""" allocator = LocationAllocation() container_code = container_obj.container_code # 获取并验证库位分配 location = allocator.get_location_by_status(container_code, data.get('current_location')) if not location or not self.perform_initial_allocation(allocator, location, container_code): return Response({'code': '400', 'message': '库位分配失败', 'data': data}, status=status.HTTP_400_BAD_REQUEST) # 生成目标位置并更新容器 target_location = self.generate_target_location(location) container_obj.target_location = target_location container_obj.save() # 创建任务并返回响应 task = self.create_inbound_task(container_code, data, target_location, location) return Response({ 'code': '200', 'message': '任务下发成功', 'data': task.to_dict() }, status=status.HTTP_200_OK) def perform_initial_allocation(self, allocator, location, container_code): """执行初始库位分配操作""" operations = [ (allocator.update_location_status, location.location_code, 'reserved'), (allocator.update_location_group_status, location.location_code), (allocator.update_batch_status, container_code, '2'), (allocator.update_location_group_batch, location, container_code), (allocator.update_location_container_link, location.location_code, container_code), (allocator.update_container_detail_status, container_code, 2) ] for func, *args in operations: if not func(*args): logger.error(f"分配操作失败: {func.__name__}") return False return True def generate_target_location(self, location): """生成目标位置字符串""" return ( f"{location.warehouse_code}-" f"{int(location.row):02d}-" f"{int(location.col):02d}-" f"{int(location.layer):02d}" ) def create_inbound_task(self, container_code, data, target_location, location): """创建入库任务""" batch_id = LocationAllocation().get_batch(container_code) self.generate_task( container_code, data.get('current_location'), target_location, batch_id, location.c_number ) task = ContainerWCSModel.objects.get(container=container_code, tasktype='inbound') self.inport_update_task(task.id, container_code) return task @transaction.atomic def inport_update_task(self, wcs_id,container_id): try: task_obj = ContainerWCSModel.objects.filter(id=wcs_id).first() if task_obj: container_detail_obj = ContainerDetailModel.objects.filter(container=container_id).all() if container_detail_obj: for detail in container_detail_obj: # 保存到数据库 batch = BoundDetailModel.objects.filter(bound_batch_id=detail.batch.id).first() TaskModel.objects.create( task_wcs = task_obj, container_detail = detail, batch_detail = batch ) logger.info(f"入库任务 {wcs_id} 已更新") else: logger.info(f"入库任务 {container_id} 批次不存在") else: logger.info(f"入库任务 {wcs_id} 不存在") except Exception as e: logger.error(f"处理入库任务时发生错误: {str(e)}", exc_info=True) return Response( {'code': '500', 'message': '服务器内部错误', 'data': None}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def handle_outbound_completion(self, container_obj, task): """处理出库完成后的库位释放和状态更新""" try: allocator = LocationAllocation() location_task = task.current_location location_row = location_task.split('-')[1] location_col = location_task.split('-')[2] location_layer = location_task.split('-')[3] location= LocationModel.objects.filter(row=location_row, col=location_col, layer=location_layer).first() location_code = location.location_code # 事务确保原子性 with transaction.atomic(): # 解除库位与托盘的关联 if not allocator.release_location(location_code): raise Exception("解除库位关联失败") # 更新库位状态为可用 if not allocator.update_location_status(location_code, 'available'): raise Exception("库位状态更新失败") # 更新库位组的统计信息 self.handle_group_location_status(location_code, location.location_group) # 更新容器状态为已出库(假设状态3表示已出库) container_obj.status = 3 container_obj.save() return True except Exception as e: logger.error(f"出库完成处理失败: {str(e)}") return False def handle_group_location_status(self,location_code,location_group): """ 处理库位组和库位的关联关系 :param location_code: 库位编码 :param location_group: 库位组编码 :return: """ # 1. 获取库位空闲状态的库位数目 location_obj_number = LocationModel.objects.filter( location_group=location_group, status='available' ).all().count() # 2. 获取库位组对象 logger.info(f"库位组 {location_group} 下的库位数目:{location_obj_number}") # 1. 获取库位和库位组的关联关系 location_group_obj = LocationGroupModel.objects.filter( group_code=location_group ).first() if not location_group_obj: logger.info(f"库位组 {location_group} 不存在") return None else: if location_obj_number == 0: # 库位组库位已满,更新库位组状态为full location_group_obj.status = 'full' location_group_obj.save() elif location_obj_number < location_group_obj.max_capacity: location_group_obj.status = 'occupied' location_group_obj.save() else: location_group_obj.status = 'available' location_group_obj.current_batch = '' location_group_obj.current_goods_code = '' location_group_obj.save() # PDA组盘入库 将扫描到的托盘编码和批次信息保存到数据库 # 1. 先查询托盘对象,如果不存在,则创建托盘对象 # 2. 循环处理每个批次,查询批次对象, # 3. 更新批次数据(根据业务规则) # 4. 保存到数据库 # 5. 保存操作记录到数据库 class ContainerDetailViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = ContainerDetailFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerDetailModel.objects.filter( is_delete=False) else: return ContainerDetailModel.objects.filter( id=id, is_delete=False) else: return ContainerDetailModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerDetailGetSerializer elif self.action in ['create', 'update']: return ContainerDetailPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data from .container_operate import ContainerService ContainerService.create_container_operation(data,logger=logger) # 将处理后的数据返回(或根据业务需求保存到数据库) res_data={ "code": "200", "msg": "Success Create", "data": data } return Response(res_data, status=200) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) class ContainerOperateViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "timestamp" ] filter_class = ContainerOperationFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return ContainerOperationModel.objects.filter( is_delete=False) else: return ContainerOperationModel.objects.filter( id=id, is_delete=False) else: return ContainerOperationModel.objects.none() def get_serializer_class(self): if self.action in ['list', 'destroy','retrieve']: return ContainerOperationGetSerializer elif self.action in ['create', 'update']: return ContainerOperationPostSerializer else: return self.http_method_not_allowed(request=self.request) def create(self, request, *args, **kwargs): data = self.request.data serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) def update(self, request, pk): qs = self.get_object() data = self.request.data serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=200, headers=headers) class OutboundService: @staticmethod def generate_task_id(): """生成唯一任务ID(格式: outbound-年月-顺序号)""" month = timezone.now().strftime("%Y%m") last_task = ContainerWCSModel.objects.filter( tasktype='outbound', month=int(month) ).order_by('-sequence').first() sequence = last_task.sequence + 1 if last_task else 1 return f"outbound-{month}-{sequence:05d}" @staticmethod def send_task_to_wcs(task): """异步发送任务到WCS(非阻塞版本)""" # 提取任务关键数据用于线程(避免直接传递ORM对象) task_data = { 'task_id': task.pk, # 使用主键而不是对象 'send_data': { "code":'200', "message": task.message, "data":{ "taskid": task.taskid, "container": task.container, "current_location": task.current_location, "target_location": task.target_location, "tasktype": task.tasktype, "month": task.month, "message": task.message, "status": task.status, "taskNumber": task.tasknumber-20000000000, "order_number":task.order_number, "sequence":task.sequence } } } # 创建并启动线程 thread = threading.Thread( target=OutboundService._async_send_handler, kwargs=task_data, daemon=True # 守护线程(主程序退出时自动终止) ) thread.start() return True # 立即返回表示已开始处理 @staticmethod def _async_send_handler(task_id, send_data): """异步处理的实际工作函数""" try: # 每个线程需要独立的数据库连接 close_old_connections() # 重新获取任务对象(确保使用最新数据) task = ContainerWCSModel.objects.get(pk=task_id) # 发送第一个请求(不处理结果) requests.post( "http://127.0.0.1:8008/container/batch/", json=send_data, timeout=10 ) # 发送关键请求 response = requests.post( "http://192.168.18.67:1616/wcs/WebApi/getOutTask", json=send_data, timeout=10 ) # 处理响应 if response.status_code == 200: task.status = 200 task.save() logger.info(f"任务 {task.taskid} 已发送") else: logger.error(f"WCS返回错误: {response.text}") except Exception as e: logger.error(f"发送失败: {str(e)}") finally: close_old_connections() # 清理数据库连接 @staticmethod def create_initial_tasks(container_list,bound_list_id): """生成初始任务队列""" with transaction.atomic(): current_WCS = ContainerWCSModel.objects.filter(tasktype='outbound',bound_list_id = bound_list_id).first() if current_WCS: logger.error(f"当前{bound_list_id}已有出库任务") return False tasks = [] start_sequence = ContainerWCSModel.objects.filter(tasktype='outbound').count() + 1 tasknumber = ContainerWCSModel.objects.filter().count() tasknumber_index = 1 for index, container in enumerate(container_list, start=start_sequence): container_obj = ContainerListModel.objects.filter(id =container['container_number']).first() if container_obj.current_location != container_obj.target_location: logger.error(f"托盘 {container_obj.container_code} 未到达目的地,不生成任务") return False OutBoundDetail_obj = OutBoundDetailModel.objects.filter(bound_list=bound_list_id,bound_batch_number_id=container['batch_id']).first() if not OutBoundDetail_obj: logger.error(f"批次 {container['batch_id']} 不存在") return False month = int(timezone.now().strftime("%Y%m")) task = ContainerWCSModel( taskid=OutboundService.generate_task_id(), batch = OutBoundDetail_obj.bound_batch_number, batch_out = OutBoundDetail_obj.bound_batch, bound_list = OutBoundDetail_obj.bound_list, sequence=index, order_number = container['location_c_number'], priority=100, tasknumber = month*100000+tasknumber_index+tasknumber, container=container_obj.container_code, current_location=container_obj.current_location, target_location="203", tasktype="outbound", month=int(timezone.now().strftime("%Y%m")), message="等待出库", status=100, ) tasknumber_index += 1 tasks.append(task) container_obj = ContainerListModel.objects.filter(container_code=task.container).first() container_obj.target_location = task.target_location container_obj.save() ContainerWCSModel.objects.bulk_create(tasks) logger.info(f"已创建 {len(tasks)} 个初始任务") @staticmethod def insert_new_tasks(new_tasks): """动态插入新任务并重新排序""" with transaction.atomic(): pending_tasks = list(ContainerWCSModel.objects.select_for_update().filter(status=100)) # 插入新任务 for new_task_data in new_tasks: new_task = ContainerWCSModel( taskid=OutboundService.generate_task_id(), priority=new_task_data.get('priority', 100), container=new_task_data['container'], current_location=new_task_data['current_location'], target_location=new_task_data.get('target_location', 'OUT01'), tasktype="outbound", month=int(timezone.now().strftime("%Y%m")), message="等待出库", status=100, ) # 找到插入位置 insert_pos = 0 for i, task in enumerate(pending_tasks): if new_task.priority < task.priority: insert_pos = i break else: insert_pos = len(pending_tasks) pending_tasks.insert(insert_pos, new_task) # 重新分配顺序号 for i, task in enumerate(pending_tasks, start=1): task.sequence = i if task.pk is None: task.save() else: task.save(update_fields=['sequence']) logger.info(f"已插入 {len(new_tasks)} 个新任务") @staticmethod def process_next_task(): """处理下一个任务""" next_task = ContainerWCSModel.objects.filter(status=100).order_by('sequence').first() if not next_task: logger.info("没有待处理任务") return allocator = LocationAllocation() OutboundService.perform_initial_allocation(allocator, next_task.current_location) OutboundService.send_task_to_wcs(next_task) def perform_initial_allocation(allocator, location): """执行初始库位分配操作""" location_row = location.split('-')[1] location_col = location.split('-')[2] location_layer = location.split('-')[3] location_code = LocationModel.objects.filter(row=location_row, col=location_col, layer=location_layer).first().location_code if not location_code: logger.error(f"未找到库位: {location}") operations = [ (allocator.update_location_status,location_code, 'reserved'), (allocator.update_location_group_status,location_code) ] for func, *args in operations: if not func(*args): logger.error(f"分配操作失败: {func.__name__}") return False return True class OutTaskViewSet(APIView): """ # fun:get_out_task:下发出库任务 # fun:get_batch_count_by_boundlist:获取出库申请下的批次数量 # fun:generate_location_by_demand:根据出库需求生成出库任务 """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 def post(self, request): try: data = self.request.data logger.info(f"收到 WMS 推送数据: {data}") # 假设从请求中获取 bound_list_id bound_list_id = data.get('bound_list_id') batch_count = self.get_batch_count_by_boundlist(bound_list_id) logger.info(f"出库批次数量: {batch_count}") # 获取需要出库的托盘列表 generate_result = self.generate_location_by_demand(batch_count,bound_list_id) if generate_result['code'] != '200': current_WCS = ContainerWCSModel.objects.filter(tasktype='outbound',bound_list_id = bound_list_id).first() if current_WCS: OutboundService.process_next_task() return Response({"code": "200", "msg": "Success 再次发送任务"}, status=200) return Response(generate_result, status=500) container_list = generate_result['data'] logger.info(f"生成出库任务: {container_list}") # 2. 生成初始任务 OutboundService.create_initial_tasks(container_list,bound_list_id) # 3. 立即发送第一个任务 OutboundService.process_next_task() return Response({"code": "200", "msg": "Success"}, status=200) except Exception as e: logger.error(f"任务生成失败: {str(e)}") return Response({"code": "500", "msg": str(e)}, status=500) # 获取出库需求 def get_batch_count_by_boundlist(self,bound_list_id): try: bound_list_obj_all = OutBoundDetailModel.objects.filter(bound_list=bound_list_id).all() if bound_list_obj_all: batch_count_dict = {} # 统计批次数量(创建哈希表,去重) for batch in bound_list_obj_all: if batch.bound_batch_number_id not in batch_count_dict: batch_count_dict[batch.bound_batch_number_id] = batch.bound_batch.goods_out_qty else: batch_count_dict[batch.bound_batch_number_id] += batch.bound_batch.goods_out_qty return batch_count_dict else: logger.error(f"查询批次数量失败: {bound_list_id} 不存在") return {} except Exception as e: logger.error(f"查询批次数量失败: {str(e)}") return {} def get_location_by_status_and_batch(self,status,bound_id): try: container_obj = ContainerDetailModel.objects.filter(batch=bound_id,status=status).all() if container_obj: container_dict = {} # 统计托盘数量(创建哈希表,去重) for obj in container_obj: if obj.container_id not in container_dict: container_dict[obj.container_id] = obj.goods_qty else: container_dict[obj.container_id] += obj.goods_qty return container_dict else: logger.error(f"查询{status}状态的批次数量失败: {bound_id} 不存在") return {} except Exception as e: logger.error(f"查询{status}状态的批次数量失败: {str(e)}") return {} def get_order_by_batch(self,container_list,bound_id): try: container_dict = {} for container in container_list: location_container = LocationContainerLink.objects.filter(container_id=container,is_active=True).first() if location_container: location_c_number = location_container.location.c_number if container not in container_dict: container_dict[container] = { "container_number":container, "location_c_number":location_c_number, "location_id ":location_container.location.id, "location_type":location_container.location.location_type, "batch_id":bound_id, } if len(container_dict.keys()) == len(container_list): return container_dict else: logger.error(f"查询批次数量失败: {container_list} 不存在") return {} except Exception as e: logger.error(f"查询批次数量失败: {str(e)}") return {} except Exception as e: logger.error(f"查询{status}状态的批次数量失败: {str(e)}") return {} def generate_location_by_demand(self,demand_list,bound_list_id): # demand_list {1: 25, 2: 17} try: return_location =[] for demand_id, demand_qty in demand_list.items(): container_list = self.get_location_by_status_and_batch(2, demand_id) if not container_list: return {"code": "500", "msg": f"批次 {demand_id} 不存在"} container_id_list = container_list.keys() container_order = self.get_order_by_batch(container_id_list,demand_id) if not container_order: return {"code": "500", "msg": f"托盘 {container_id_list} 不存在"} order = sorted( container_order.values(), key=lambda x: ( int(x['location_type'][-1]), # 提取最后一位数字并转为整数 -x['location_c_number'] # 按location_c_number降序 ) ) current_qty = 0 for container in order: container_detail_obj = ContainerDetailModel.objects.filter(container_id=container['container_number'],batch_id=demand_id,status=2).all() container_obj = ContainerListModel.objects.filter(id=container['container_number']).first() if not container_obj: return {"code": "500", "msg": f"托盘 {container['container_number']} 不存在"} if not container_detail_obj: return {"code": "500", "msg": f"托盘上无该批次,请检查{container['container_number']} 不存在"} goods_qty = 0 for obj in container_detail_obj: goods_qty += obj.goods_qty if current_qty < demand_qty: now_qty = current_qty current_qty += goods_qty return_location.append(container) logger.info(f"批次 {demand_id} 托盘 {container['container_number']} 当前数量 {current_qty}") self.create_or_update_container_operation(container_obj,demand_id,bound_list_id,203,min(demand_qty-now_qty,goods_qty),min(demand_qty-now_qty,goods_qty)) self.update_container_detail_out_qty(container_obj,demand_id) else: break return {"code": "200", "msg": "Success", "data": return_location} except Exception as e: return {"code": "500", "msg": str(e)} def create_or_update_container_operation(self,container_obj,batch_id,bound_id,to_location,goods_qty,goods_weight): try: container_operation_obj = ContainerOperationModel.objects.filter(container=container_obj,batch_id=batch_id,bound_id=bound_id,operation_type="outbound").first() if container_operation_obj: logger.info(f"[0]查询出库任务: {container_operation_obj.operation_type} ") logger.info(f"更新出库任务: {container_obj.container_code} 批次 {batch_id} 出库需求: {bound_id} 数量: {goods_qty} 重量: {goods_weight}") container_operation_obj.to_location = to_location container_operation_obj.goods_qty = goods_qty container_operation_obj.goods_weight = goods_weight container_operation_obj.save() else: logger.info(f"创建出库任务: {container_obj.container_code} 批次 {batch_id} 出库需求: {bound_id} 数量: {goods_qty} 重量: {goods_weight}") batch = BoundBatchModel.objects.filter(id=batch_id).first() if not batch: return {"code": "500", "msg": f"批次 {batch_id} 不存在"} ContainerOperationModel.objects.create( month = int(timezone.now().strftime("%Y%m")), container = container_obj, goods_code = batch.goods_code, goods_desc = batch.goods_desc, operation_type ="outbound", batch_id = batch_id, bound_id = bound_id, goods_qty = goods_qty, goods_weight = goods_weight, from_location = container_obj.current_location, to_location= to_location, timestamp=timezone.now(), operator="WMS", memo=f"出库需求: {bound_id}, 批次: {batch_id}, 数量: {goods_qty}" ) return {"code": "200", "msg": "Success"} except Exception as e: return {"code": "500", "msg": str(e)} def update_container_detail_out_qty(self,container_obj,batch_id): try: logger.info(f"[1]更新托盘出库数量: {container_obj.container_code} 批次 {batch_id} ") container_operation_obj = ContainerOperationModel.objects.filter(container=container_obj,batch_id=batch_id,operation_type="outbound").all() if not container_operation_obj: logger.error(f"[1]批次 {batch_id} 托盘 {container_obj.container_code} 无出库任务") return {"code": "500", "msg": f"批次 {batch_id} 托盘 {container_obj.container_code} 无出库任务"} container_detail_obj = ContainerDetailModel.objects.filter(container=container_obj,batch_id=batch_id,status=2).first() if not container_detail_obj: logger.error(f"[1]批次 {batch_id} 托盘 {container_obj.container_code} 无批次信息") return {"code": "500", "msg": f"批次 {batch_id} 托盘 {container_obj.container_code} 无批次信息"} out_qty = 0 for obj in container_operation_obj: out_qty += obj.goods_qty if out_qty >= container_detail_obj.goods_qty: out_qty = container_detail_obj.goods_qty container_detail_obj.status = 3 break if out_qty == 0: logger.error(f"[1]批次 {batch_id} 托盘 {container_obj.container_code} 无出库数量") return {"code": "500", "msg": f"批次 {batch_id} 托盘 {container_obj.container_code} 无出库数量"} container_detail_obj.goods_out_qty = out_qty container_detail_obj.save() return {"code": "200", "msg": "Success"} except Exception as e: return {"code": "500", "msg": str(e)} class BatchViewSet(viewsets.ModelViewSet): authentication_classes = [] # 禁用所有认证类 permission_classes = [AllowAny] # 允许任意访问 def wcs_post(self, request, *args, **kwargs): data = self.request.data logger.info(f"收到 WMS 推送数据: {data}") return Response({"code": "200", "msg": "Success"}, status=200)