12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301 |
- from rest_framework import viewsets
- from utils.page import MyPageNumberPagination
- from utils.datasolve import sumOfList, transportation_calculate
- from utils.md5 import Md5
- from rest_framework.filters import OrderingFilter
- from django_filters.rest_framework import DjangoFilterBackend
- from rest_framework.response import Response
- from rest_framework.exceptions import APIException
- from django.utils import timezone
- import requests
- import json
- from django.conf import settings
- from django.db import transaction
- import logging
- from rest_framework import status
- from .models import ContainerListModel,ContainerDetailModel,ContainerOperationModel,ContainerWCSModel,TaskModel
- from bound.models import BoundBatchModel,BoundDetailModel,BoundListModel,OutBoundDetailModel
- from bin.views import LocationAllocation,base_location
- from bin.models import LocationModel,LocationContainerLink
- # from .files import FileListRenderCN, FileDetailRenderCN
- from .serializers import ContainerDetailGetSerializer,ContainerDetailPostSerializer
- from .serializers import ContainerListGetSerializer,ContainerListPostSerializer
- from .serializers import ContainerOperationGetSerializer,ContainerOperationPostSerializer
- from .serializers import TaskGetSerializer,TaskPostSerializer
- from .filter import ContainerDetailFilter,ContainerListFilter,ContainerOperationFilter,TaskFilter
- # 以后添加模
- from warehouse.models import ListModel as warehouse
- from staff.models import ListModel as staff
- from rest_framework.permissions import AllowAny
- import threading
- from django.db import close_old_connections
- logger = logging.getLogger(__name__)
- class ContainerListViewSet(viewsets.ModelViewSet):
- """
- retrieve:
- Response a data list(get)
- list:
- Response a data list(all)
- create:
- Create a data line(post)
- delete:
- Delete a data line(delete)
- """
- # authentication_classes = [] # 禁用所有认证类
- # permission_classes = [AllowAny] # 允许任意访问
-
- pagination_class = MyPageNumberPagination
- filter_backends = [DjangoFilterBackend, OrderingFilter, ]
- ordering_fields = ['id', "create_time", "update_time", ]
- filter_class = ContainerListFilter
- def get_project(self):
- try:
- id = self.kwargs.get('pk')
- return id
- except:
- return None
- def get_queryset(self):
- id = self.get_project()
- if self.request.user:
- if id is None:
- return ContainerListModel.objects.filter()
- else:
- return ContainerListModel.objects.filter( id=id)
- else:
- return ContainerListModel.objects.none()
- def get_serializer_class(self):
- if self.action in ['list', 'destroy','retrieve']:
- return ContainerListGetSerializer
- elif self.action in ['create', 'update']:
- return ContainerListPostSerializer
- else:
- return self.http_method_not_allowed(request=self.request)
- def create(self, request, *args, **kwargs):
- data = self.request.data
- order_month = str(timezone.now().strftime('%Y%m'))
- data['month'] = order_month
- data['last_operate'] = str(timezone.now())
-
- serializer = self.get_serializer(data=data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- headers = self.get_success_headers(serializer.data)
- return Response(serializer.data, status=200, headers=headers)
-
- def update(self, request, pk):
- qs = self.get_object()
- data = self.request.data
- serializer = self.get_serializer(qs, data=data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- headers = self.get_success_headers(serializer.data)
- return Response(serializer.data, status=200, headers=headers)
- class TaskViewSet(viewsets.ModelViewSet):
- """
- retrieve:
- Response a data list(get)
- list:
- Response a data list(all)
- create:
- Create a data line(post)
- delete:
- Delete a data line(delete)
- """
- pagination_class = MyPageNumberPagination
- filter_backends = [DjangoFilterBackend, OrderingFilter, ]
- ordering_fields = ['id', "create_time", "update_time", ]
- filter_class = TaskFilter
- def get_project(self):
- try:
- id = self.kwargs.get('pk')
- return id
- except:
- return None
- def get_queryset(self):
- id = self.get_project()
- if self.request.user:
- if id is None:
- return TaskModel.objects.filter()
- else:
- return TaskModel.objects.filter( id=id)
- else:
- return TaskModel.objects.none()
- def get_serializer_class(self):
- if self.action in ['list', 'destroy','retrieve']:
- return TaskGetSerializer
- elif self.action in ['create', 'update']:
- return TaskPostSerializer
- else:
- return self.http_method_not_allowed(request=self.request)
- def create(self, request, *args, **kwargs):
- data = self.request.data
- return Response(data, status=200, headers=headers)
-
- def update(self, request, pk):
- qs = self.get_object()
- data = self.request.data
- serializer = self.get_serializer(qs, data=data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- headers = self.get_success_headers(serializer.data)
- return Response(serializer.data, status=200, headers=headers)
- class TaskRollbackMixin:
- @transaction.atomic
- def rollback_task(self, request, task_id, *args, **kwargs):
- """
- 撤销入库任务并回滚相关状态
- """
- try:
- # 获取任务实例并锁定数据库记录
- task = ContainerWCSModel.objects.select_for_update().get(taskid=task_id)
- container_code = task.container
- target_location = task.target_location
- batch = task.batch
- # 初始化库位分配器
- allocator = LocationAllocation()
- # ==================== 库位状态回滚 ====================
- # 解析目标库位信息(格式:仓库代码-行-列-层)
- try:
- warehouse_code, row, col, layer = target_location.split('-')
- location = LocationModel.objects.get(
- warehouse_code=warehouse_code,
- row=int(row),
- col=int(col),
- layer=int(layer)
- )
-
- # 回滚库位状态到可用状态
- allocator.update_location_status(location.location_code, 'available')
-
- # 更新库位组状态(需要根据实际逻辑实现)
- allocator.update_location_group_status(location.location_code)
-
- # 解除库位与托盘的关联
- allocator.update_location_container_link(location.location_code, None)
-
- # 清除库位组的批次关联
- allocator.update_location_group_batch(location, None)
- except (ValueError, LocationModel.DoesNotExist) as e:
- logger.error(f"库位解析失败: {str(e)}")
- raise Exception("关联库位信息无效")
- # ==================== 批次状态回滚 ====================
- if batch:
- # 将批次状态恢复为未处理状态(假设原状态为1)
- allocator.update_batch_status(batch.bound_number, '1')
- # ==================== 容器状态回滚 ====================
- container_obj = ContainerListModel.objects.get(container_code=container_code)
-
- # 恢复容器详细状态为初始状态(假设原状态为1)
- allocator.update_container_detail_status(container_code, 1)
-
- # 恢复容器的目标位置为当前所在位置
- container_obj.target_location = task.current_location
- container_obj.save()
- # ==================== 删除任务记录 ====================
- task.delete()
- # ==================== 其他关联清理 ====================
- # 如果有其他关联数据(如inport_update_task的操作),在此处添加清理逻辑
- return Response(
- {'code': '200', 'message': '任务回滚成功', 'data': None},
- status=status.HTTP_200_OK
- )
- except ContainerWCSModel.DoesNotExist:
- logger.warning(f"任务不存在: {task_id}")
- return Response(
- {'code': '404', 'message': '任务不存在', 'data': None},
- status=status.HTTP_404_NOT_FOUND
- )
- except Exception as e:
- logger.error(f"任务回滚失败: {str(e)}", exc_info=True)
- return Response(
- {'code': '500', 'message': '服务器内部错误', 'data': None},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
- class ContainerWCSViewSet(viewsets.ModelViewSet):
- """
- retrieve:
- Response a data list(get)
- list:
- Response a data list(all)
- create:
- Create a data line(post)
- delete:
- Delete a data line(delete)
- """
- authentication_classes = [] # 禁用所有认证类
- permission_classes = [AllowAny] # 允许任意访问
-
- def get_container_wcs(self, request, *args, **kwargs):
- data = self.request.data
- container = data.get('container_number')
- current_location = data.get('current_location')
- logger.info(f"请求托盘:{container},请求位置:{current_location}")
- data_return = {}
- try:
- container_obj = ContainerListModel.objects.filter(container_code=container).first()
- if not container_obj:
- data_return = {
- 'code': '400',
- 'message': '托盘编码不存在',
- 'data': data
- }
- return Response(data_return, status=status.HTTP_400_BAD_REQUEST)
- # 更新容器数据(部分更新)
- serializer = ContainerListPostSerializer(
- container_obj,
- data=data,
- partial=True # 允许部分字段更新
- )
- serializer.is_valid(raise_exception=True)
- serializer.save()
-
- # 检查是否已在目标位置
- if current_location == str(container_obj.target_location):
- logger.info(f"托盘 {container} 已在目标位置")
- data_return = {
- 'code': '200',
- 'message': '当前位置已是目标位置',
- 'data': data
- }
- else:
- current_task = ContainerWCSModel.objects.filter(
- container=container,
- tasktype='inbound'
- ).first()
- if current_task:
-
- data_return = {
- 'code': '200',
- 'message': '任务已存在,重新下发',
- 'data': current_task.to_dict()
- }
- else:
- # 库位分配
- container_code = container
- print(f"开始生成库位,托盘编码:{container_code}")
- allocator = LocationAllocation() # 创建实例
- location_list_cnumber = allocator.get_location_by_status(container_code, current_location) # 获取库位列表
- if not location_list_cnumber:
- print("❌ 通用库位获取失败,请检查托盘编码")
- return
- print(f"[1]库位:{location_list_cnumber}")
-
- update_location_status = allocator.update_location_status(location_list_cnumber.location_code, 'reserved') # 更新库位状态
- if not update_location_status:
- print("❌ 库位状态更新失败,请检查托盘编码")
- return
- print(f"[2]发送任务,库位状态更新成功!")
- update_location_group_status = allocator.update_location_group_status(location_list_cnumber.location_code) # 更新库位组状态
- if not update_location_group_status:
- print("❌ 库位组状态更新失败,请检查托盘编码")
- return
- print(f"[3]库位组状态更新成功!")
- update_batch_status = allocator.update_batch_status(container_code, '2') # 更新批次状态
- if not update_batch_status:
- print("❌ 批次状态更新失败,请检查批次号")
- return
- print(f"[4]批次状态更新成功!")
- update_location_group_batch = allocator.update_location_group_batch(location_list_cnumber, container_code) # 更新库位组的批次
- if not update_location_group_batch:
- print("❌ 库位组批次更新失败,请检查托盘编码")
- return
- print(f"[5]库位组批次更新成功!")
- update_location_container_link = allocator.update_location_container_link(location_list_cnumber.location_code, container_code) # 更新库位和托盘的关联关系
- if not update_location_container_link:
- print("❌ 库位和托盘的关联关系更新失败,请检查托盘编码")
- return
- print(f"[7]库位和托盘的关联关系更新成功!")
- update_location_container_detail = allocator.update_container_detail_status(container_code,2) # 更新库位和托盘的关联关系
- if not update_location_container_detail:
- print("❌ 库位和托盘的关联关系更新失败,请检查托盘编码")
- return
- print(f"[8]托盘的关联关系更新成功!")
-
- allocation_target_location = (
- location_list_cnumber.warehouse_code + '-'
- + f"{int(location_list_cnumber.row):02d}" + '-'
- + f"{int(location_list_cnumber.col):02d}" + '-'
- + f"{int(location_list_cnumber.layer):02d}"
- )
- batch_id = allocator.get_batch(container_code)
- self.generate_task(container, current_location, allocation_target_location,batch_id,location_list_cnumber.c_number) # 生成任务
- current_task = ContainerWCSModel.objects.get(
- container=container,
- tasktype='inbound'
- )
-
- data_return = {
- 'code': '200',
- 'message': '任务下发成功',
- 'data': current_task.to_dict()
- }
- container_obj.target_location = allocation_target_location
- container_obj.save()
- self.inport_update_task(current_task.id, container_obj.id)
- http_status = status.HTTP_200_OK if data_return['code'] == '200' else status.HTTP_400_BAD_REQUEST
- return Response(data_return, status=http_status)
- except Exception as e:
- logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True)
- return Response(
- {'code': '500', 'message': '服务器内部错误', 'data': None},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
- @transaction.atomic
- def generate_task(self, container, current_location, target_location,batch_id,location_c_number):
- batch = BoundBatchModel.objects.filter(bound_number=batch_id).first()
- batch_detail = BoundDetailModel.objects.filter(bound_batch=batch).first()
- if not batch:
- logger.error(f"批次号 {batch_id} 不存在")
- return False
- data_tosave = {
- 'container': container,
- 'batch': batch,
- 'batch_out': None,
- 'bound_list': batch_detail.bound_list,
- 'sequence': 1,
- 'order_number' :location_c_number,
- 'priority': 1,
- 'current_location': current_location,
- 'month': timezone.now().strftime('%Y%m'),
- 'target_location': target_location,
- 'tasktype': 'inbound',
- 'status': 103,
- 'is_delete': False
- }
- # 生成唯一递增的 taskid
- last_task = ContainerWCSModel.objects.filter(
- month=data_tosave['month'],
-
- ).order_by('-taskid').first()
- if last_task:
- last_id = int(last_task.taskid.split('-')[-1])
- new_id = f"{last_id + 1:05}"
- else:
- new_id = "00001"
- data_tosave['taskid'] = f"inbound-{data_tosave['month']}-{new_id}"
- logger.info(f"生成入库任务: {data_tosave['taskid']}")
- # 每月生成唯一递增的 taskNumber
- data_tosave['tasknumber'] = f"{data_tosave['month']}{new_id}"
- ContainerWCSModel.objects.create(**data_tosave)
- def update_container_wcs(self, request, *args, **kwargs):
- data = self.request.data
- logger.info(f"请求托盘:{data.get('container_number')}, 请求位置:{data.get('current_location')}, 任务号:{data.get('taskNumber')}")
- try:
- # 前置校验
- container_obj, error_response = self.validate_container(data)
- if error_response:
- return error_response
- # 更新容器数据
- if not self.update_container_data(container_obj, data):
- return Response({'code': '400', 'message': '数据更新失败', 'data': data},
- status=status.HTTP_400_BAD_REQUEST)
- # 处理位置逻辑
- if self.is_already_at_target(container_obj, data.get('current_location')):
- return self.handle_target_reached(container_obj, data)
- else:
- return self.handle_new_allocation(container_obj, data)
- except Exception as e:
- logger.error(f"处理请求时发生错误: {str(e)}", exc_info=True)
- return Response({'code': '500', 'message': '服务器内部错误', 'data': None},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- # ---------- 辅助函数 ----------
- def validate_container(self, data):
- """验证容器是否存在"""
- container = data.get('container_number')
- container_obj = ContainerListModel.objects.filter(container_code=container).first()
- if not container_obj:
- return None, Response({
- 'code': '400',
- 'message': '托盘编码不存在',
- 'data': data
- }, status=status.HTTP_400_BAD_REQUEST)
- return container_obj, None
- def update_container_data(self, container_obj, data):
- """更新容器数据"""
- serializer = ContainerListPostSerializer(
- container_obj,
- data=data,
- partial=True
- )
- if serializer.is_valid():
- serializer.save()
- return True
- return False
- def is_already_at_target(self, container_obj, current_location):
- """检查是否已在目标位置"""
- return current_location == str(container_obj.target_location)
- def handle_target_reached(self, container_obj, data):
- """处理已到达目标位置的逻辑"""
- logger.info(f"托盘 {container_obj.container_code} 已在目标位置")
- task = self.get_task_by_tasknumber(data)
- self.update_pressure_values(task, container_obj)
- task = self.process_task_completion(data)
- if not task:
- return Response({'code': '400', 'message': '任务不存在', 'data': data},
- status=status.HTTP_400_BAD_REQUEST)
- if task and task.tasktype == 'inbound':
- self.update_storage_system(container_obj)
-
- if task and task.tasktype == 'outbound':
- OutboundService.process_next_task()
-
- return Response({
- 'code': '200',
- 'message': '当前位置已是目标位置',
- 'data': data
- }, status=status.HTTP_200_OK)
- def get_task_by_tasknumber(self, data):
-
- taskNumber = data.get('taskNumber') + 20000000000
- task = ContainerWCSModel.objects.filter(tasknumber=taskNumber).first()
- if task:
- return task
- else:
- return None
- def process_task_completion(self, data):
- """处理任务完成状态"""
- taskNumber = data.get('taskNumber') + 20000000000
- task = ContainerWCSModel.objects.filter(tasknumber=taskNumber).first()
- if task:
- task.status = 300
- task.working = 0
- task.save()
- return task
- def update_pressure_values(self, task, container_obj):
- """更新压力值计算"""
- if task and task.tasktype in ['inbound']:
- base_location_obj = base_location.objects.get(id=1)
- layer = int(container_obj.target_location.split('-')[-1])
- pressure_field = f"layer{layer}_pressure"
- logger.info(f"更新压力值,压力字段:{pressure_field}")
- current_pressure = getattr(base_location_obj, pressure_field, 0)
- updated_pressure = max(current_pressure - task.working, 0)
- setattr(base_location_obj, pressure_field, updated_pressure)
- base_location_obj.save()
- def update_storage_system(self, container_obj):
- """更新仓储系统状态"""
- allocator = LocationAllocation()
- location_code = self.get_location_code(container_obj.target_location)
-
- # 链式更新操作
- update_operations = [
- (allocator.update_location_status, location_code, 'occupied'),
- (allocator.update_location_container_link, location_code, container_obj.container_code),
- (allocator.update_container_detail_status, container_obj.container_code, 2)
- ]
-
- for func, *args in update_operations:
- if not func(*args):
- logger.error(f"操作失败: {func.__name__}")
- return False
- return True
- def get_location_code(self, target_location):
- """从目标位置解析获取位置编码"""
- parts = target_location.split('-')
- coordinate = f"{int(parts[1])}-{int(parts[2])}-{int(parts[3])}"
- return LocationModel.objects.filter(coordinate=coordinate).first().location_code
- def handle_new_allocation(self, container_obj, data):
- """处理新库位分配逻辑"""
- allocator = LocationAllocation()
- container_code = container_obj.container_code
-
- # 获取并验证库位分配
- location = allocator.get_location_by_status(container_code, data.get('current_location'))
- if not location or not self.perform_initial_allocation(allocator, location, container_code):
- return Response({'code': '400', 'message': '库位分配失败', 'data': data},
- status=status.HTTP_400_BAD_REQUEST)
-
- # 生成目标位置并更新容器
- target_location = self.generate_target_location(location)
- container_obj.target_location = target_location
- container_obj.save()
-
- # 创建任务并返回响应
- task = self.create_inbound_task(container_code, data, target_location, location)
- return Response({
- 'code': '200',
- 'message': '任务下发成功',
- 'data': task.to_dict()
- }, status=status.HTTP_200_OK)
- def perform_initial_allocation(self, allocator, location, container_code):
- """执行初始库位分配操作"""
- operations = [
- (allocator.update_location_status, location.location_code, 'reserved'),
- (allocator.update_location_group_status, location.location_code),
- (allocator.update_batch_status, container_code, '2'),
- (allocator.update_location_group_batch, location, container_code),
- (allocator.update_location_container_link, location.location_code, container_code),
- (allocator.update_container_detail_status, container_code, 2)
- ]
-
- for func, *args in operations:
- if not func(*args):
- logger.error(f"分配操作失败: {func.__name__}")
- return False
- return True
- def generate_target_location(self, location):
- """生成目标位置字符串"""
- return (
- f"{location.warehouse_code}-"
- f"{int(location.row):02d}-"
- f"{int(location.col):02d}-"
- f"{int(location.layer):02d}"
- )
- def create_inbound_task(self, container_code, data, target_location, location):
- """创建入库任务"""
- batch_id = LocationAllocation().get_batch(container_code)
- self.generate_task(
- container_code,
- data.get('current_location'),
- target_location,
- batch_id,
- location.c_number
- )
- task = ContainerWCSModel.objects.get(container=container_code, tasktype='inbound')
- self.inport_update_task(task.id, container_code)
- return task
- @transaction.atomic
- def inport_update_task(self, wcs_id,container_id):
- try:
- task_obj = ContainerWCSModel.objects.filter(id=wcs_id).first()
- if task_obj:
- container_detail_obj = ContainerDetailModel.objects.filter(container=container_id).all()
- if container_detail_obj:
- for detail in container_detail_obj:
- # 保存到数据库
- batch = BoundDetailModel.objects.filter(bound_batch_id=detail.batch.id).first()
- TaskModel.objects.create(
- task_wcs = task_obj,
- container_detail = detail,
- batch_detail = batch
- )
- logger.info(f"入库任务 {wcs_id} 已更新")
- else:
- logger.info(f"入库任务 {container_id} 批次不存在")
- else:
- logger.info(f"入库任务 {wcs_id} 不存在")
- except Exception as e:
- logger.error(f"处理入库任务时发生错误: {str(e)}", exc_info=True)
- return Response(
- {'code': '500', 'message': '服务器内部错误', 'data': None},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
-
- # PDA组盘入库 将扫描到的托盘编码和批次信息保存到数据库
- # 1. 先查询托盘对象,如果不存在,则创建托盘对象
- # 2. 循环处理每个批次,查询批次对象,
- # 3. 更新批次数据(根据业务规则)
- # 4. 保存到数据库
- # 5. 保存操作记录到数据库
- class ContainerDetailViewSet(viewsets.ModelViewSet):
- """
- retrieve:
- Response a data list(get)
- list:
- Response a data list(all)
- create:
- Create a data line(post)
- delete:
- Delete a data line(delete)
- """
- # authentication_classes = [] # 禁用所有认证类
- # permission_classes = [AllowAny] # 允许任意访问
-
- pagination_class = MyPageNumberPagination
- filter_backends = [DjangoFilterBackend, OrderingFilter, ]
- ordering_fields = ['id', "create_time", "update_time", ]
- filter_class = ContainerDetailFilter
- def get_project(self):
- try:
- id = self.kwargs.get('pk')
- return id
- except:
- return None
- def get_queryset(self):
- id = self.get_project()
- if self.request.user:
- if id is None:
- return ContainerDetailModel.objects.filter( is_delete=False)
- else:
- return ContainerDetailModel.objects.filter( id=id, is_delete=False)
- else:
- return ContainerDetailModel.objects.none()
- def get_serializer_class(self):
- if self.action in ['list', 'destroy','retrieve']:
- return ContainerDetailGetSerializer
- elif self.action in ['create', 'update']:
- return ContainerDetailPostSerializer
- else:
- return self.http_method_not_allowed(request=self.request)
- def create(self, request, *args, **kwargs):
- data = self.request.data
- order_month = str(timezone.now().strftime('%Y%m'))
- data['month'] = order_month
- container_code = data.get('container')
- batches = data.get('batches', []) # 确保有默认空列表
- print('扫描到的托盘编码', container_code)
-
- # 处理托盘对象
- container_obj = ContainerListModel.objects.filter(container_code=container_code).first()
- if container_obj:
- data['container'] = container_obj.id
- logger.info(f"托盘 {container_code} 已存在")
-
- else:
- logger.info(f"托盘 {container_code} 不存在,创建托盘对象")
- serializer_list = ContainerListPostSerializer(data={'container_code': container_code})
- serializer_list.is_valid(raise_exception=True)
- serializer_list.save()
- data['container'] = serializer_list.data.get('id')
-
- # 循环处理每个批次
- for batch in batches:
- bound_number = batch.get('goods_code')
- goods_qty = batch.get('goods_qty')
-
- # 查询商品对象
- bound_obj = BoundBatchModel.objects.filter(bound_number=bound_number).first()
- if not bound_obj:
- # 如果商品不存在,返回错误,这里暂时在程序中进行提醒,后续需要改为前端弹窗提醒
- logger.error(f"批次 {bound_number} 不存在")
- # 跳出此次循环
- continue
- # return Response({"error": f"商品编码 {bound_number} 不存在"}, status=400)
- # 3. 更新批次数据(根据业务规则)
- try:
- last_qty = bound_obj.goods_in_qty
- bound_obj.goods_in_qty += batch.get("goods_qty", 0)
- if bound_obj.goods_in_qty >= bound_obj.goods_qty:
- bound_obj.goods_in_qty = bound_obj.goods_qty
- bound_obj.status = 1 # 批次状态为组盘完成
- print('批次id',bound_obj.id)
- bound_detail_obj = BoundDetailModel.objects.filter(bound_batch=bound_obj.id).first()
- if bound_detail_obj:
- bound_detail_obj.status = 1
- bound_detail_obj.save()
- print('入库申请id',bound_detail_obj.bound_list_id)
- # 入库申请全部批次入库完成
- bound_batch_all = BoundDetailModel.objects.filter(bound_list=bound_detail_obj.bound_list_id).all()
- if bound_batch_all.count() == bound_batch_all.filter(status=1).count():
- bound_list_obj = BoundListModel.objects.filter(id=bound_detail_obj.bound_list_id).first()
- print('当前状态',bound_list_obj.bound_status)
- bound_list_obj.bound_status = 102
- print('更新状态',bound_list_obj.bound_status)
- bound_list_obj.save()
- print('入库申请全部批次组盘完成')
- else:
- print('入库申请部分批次组盘完成')
- else:
- bound_obj.status = 0
-
- bound_obj.save() # 保存到数据库
- # 创建托盘详情记录(每个批次独立)
- print('新增个数',bound_obj.goods_in_qty-last_qty)
- if bound_obj.goods_in_qty-last_qty == goods_qty:
- detail_data = {
- "container": data['container'], # 托盘ID
- "batch": bound_obj.id, # 外键关联批次
- "goods_code": bound_obj.goods_code,
- "goods_desc": bound_obj.goods_desc,
- "goods_qty": goods_qty,
- "goods_weight": bound_obj.goods_weight,
- "status": 1,
- "month": data['month'],
- "creater": data.get('creater', 'zl') # 默认值兜底
- }
- serializer = self.get_serializer(data=detail_data)
- serializer.is_valid(raise_exception=True)
- serializer.save() # 必须保存到数据库
- operate_data = {
- "month" : data['month'],
- "container": data['container'], # 托盘ID
- "operation_type" : 'container',
- "batch" : bound_obj.id, # 外键关联批次
- "goods_code": bound_obj.goods_code,
- "goods_desc": bound_obj.goods_desc,
- "goods_qty": goods_qty,
- "goods_weight": bound_obj.goods_weight,
- "operator": data.get('creater', 'zl'), # 默认值兜底
- "timestamp": timezone.now(),
- "from_location": "container",
- "to_location": "container",
- "memo": "入库PDA组盘,pda入库"+str(bound_obj.goods_code)+"数量"+str(goods_qty)
- }
- serializer_operate = ContainerOperationPostSerializer(data=operate_data)
- serializer_operate.is_valid(raise_exception=True)
- serializer_operate.save() # 必须保存到数据库
- elif bound_obj.goods_in_qty-last_qty > 0:
- print('批次数量不一致')
- detail_data = {
- "container": data['container'], # 托盘ID
- "batch": bound_obj.id, # 外键关联批次
- "goods_code": bound_obj.goods_code,
- "goods_desc": bound_obj.goods_desc,
- "goods_qty": bound_obj.goods_in_qty-last_qty,
- "goods_weight": bound_obj.goods_weight,
- "status": 1,
- "month": data['month'],
- "creater": data.get('creater', 'zl') # 默认值兜底
- }
- serializer = self.get_serializer(data=detail_data)
- serializer.is_valid(raise_exception=True)
- serializer.save() # 必须保存到数据库
- operate_data = {
- "month" : data['month'],
- "container": data['container'], # 托盘ID
- "operation_type" : 'container',
- "batch" : bound_obj.id, # 外键关联批次
- "goods_code": bound_obj.goods_code,
- "goods_desc": bound_obj.goods_desc,
- "goods_qty": bound_obj.goods_in_qty-last_qty,
- "goods_weight": bound_obj.goods_weight,
- "operator": data.get('creater', 'zl'), # 默认值兜底
- "timestamp": timezone.now(),
- "from_location": "container",
- "to_location": "container",
- "memo": "入库PDA组盘,(数量不一致)pda入库"+str(bound_obj.goods_code)+"数量"+str(goods_qty)
- }
- serializer_operate = ContainerOperationPostSerializer(data=operate_data)
- serializer_operate.is_valid(raise_exception=True)
- serializer_operate.save() # 必须保存到数据库
- else :
- print('重复组盘')
-
- except Exception as e:
- print(f"更新批次 {bound_number} 失败: {str(e)}")
- continue
- # 将处理后的数据返回(或根据业务需求保存到数据库)
- res_data={
- "code": "200",
- "msg": "Success Create",
- "data": data
- }
- return Response(res_data, status=200)
- def update(self, request, pk):
- qs = self.get_object()
- data = self.request.data
- serializer = self.get_serializer(qs, data=data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- headers = self.get_success_headers(serializer.data)
- return Response(serializer.data, status=200, headers=headers)
- class ContainerOperateViewSet(viewsets.ModelViewSet):
- """
- retrieve:
- Response a data list(get)
- list:
- Response a data list(all)
- create:
- Create a data line(post)
- delete:
- Delete a data line(delete)
- """
- # authentication_classes = [] # 禁用所有认证类
- # permission_classes = [AllowAny] # 允许任意访问
-
- pagination_class = MyPageNumberPagination
- filter_backends = [DjangoFilterBackend, OrderingFilter, ]
- ordering_fields = ['id', "timestamp" ]
- filter_class = ContainerOperationFilter
- def get_project(self):
- try:
- id = self.kwargs.get('pk')
- return id
- except:
- return None
- def get_queryset(self):
- id = self.get_project()
- if self.request.user:
- if id is None:
- return ContainerOperationModel.objects.filter( is_delete=False)
- else:
- return ContainerOperationModel.objects.filter( id=id, is_delete=False)
- else:
- return ContainerOperationModel.objects.none()
- def get_serializer_class(self):
- if self.action in ['list', 'destroy','retrieve']:
- return ContainerOperationGetSerializer
- elif self.action in ['create', 'update']:
- return ContainerOperationPostSerializer
- else:
- return self.http_method_not_allowed(request=self.request)
- def create(self, request, *args, **kwargs):
- data = self.request.data
- serializer = self.get_serializer(data=data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- headers = self.get_success_headers(serializer.data)
- return Response(serializer.data, status=200, headers=headers)
-
- def update(self, request, pk):
- qs = self.get_object()
- data = self.request.data
- serializer = self.get_serializer(qs, data=data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- headers = self.get_success_headers(serializer.data)
- return Response(serializer.data, status=200, headers=headers)
- class OutboundService:
- @staticmethod
- def generate_task_id():
- """生成唯一任务ID(格式: outbound-年月-顺序号)"""
- month = timezone.now().strftime("%Y%m")
- last_task = ContainerWCSModel.objects.filter(
- tasktype='outbound',
- month=int(month)
- ).order_by('-sequence').first()
- sequence = last_task.sequence + 1 if last_task else 1
- return f"outbound-{month}-{sequence:05d}"
- # @staticmethod
- # def send_task_to_wcs(task):
- # """发送任务到WCS"""
- # send_data = {
- # "taskid": task.taskid,
- # "container": task.container,
- # "current_location": task.current_location,
- # "target_location": task.target_location,
- # "tasktype": task.tasktype,
- # "month": task.month,
- # "message": task.message,
- # "status": task.status,
- # "taskNumber": task.tasknumber
- # }
- # try:
- # requests.post("http://127.0.0.1:8008/container/batch/", json=send_data, timeout=10)
- # response = requests.post("http://192.168.18.67:1616//wcs/WebApi/getOutTask", json=send_data, timeout=10)
- # if response.status_code == 200:
- # task.status = 200
- # task.save()/wcs/WebApi/getOutTask
- # logger.info(f"任务 {task.taskid} 已发送")
- # return True
- # else:
- # logger.error(f"WCS返回错误: {response.text}")
- # task.status = 400
- # task.save()
- # return False
- # except Exception as e:
- # logger.error(f"发送失败: {str(e)}")
-
- # return False
- @staticmethod
- def send_task_to_wcs(task):
- """异步发送任务到WCS(非阻塞版本)"""
- # 提取任务关键数据用于线程(避免直接传递ORM对象)
- task_data = {
- 'task_id': task.pk, # 使用主键而不是对象
- 'send_data': {
- "code":'200',
- "message": task.message,
- "data":{
- "taskid": task.taskid,
- "container": task.container,
- "current_location": task.current_location,
- "target_location": task.target_location,
- "tasktype": task.tasktype,
- "month": task.month,
- "message": task.message,
- "status": task.status,
- "taskNumber": task.tasknumber-20000000000,
- "order_number":task.order_number,
- "sequence":task.sequence
- }
- }
- }
- container_obj = ContainerListModel.objects.filter(container_code=task.container).first()
- container_obj.target_location = task.target_location
- container_obj.save()
- # 创建并启动线程
- thread = threading.Thread(
- target=OutboundService._async_send_handler,
- kwargs=task_data,
- daemon=True # 守护线程(主程序退出时自动终止)
- )
- thread.start()
- return True # 立即返回表示已开始处理
- @staticmethod
- def _async_send_handler(task_id, send_data):
- """异步处理的实际工作函数"""
- try:
- # 每个线程需要独立的数据库连接
- close_old_connections()
-
- # 重新获取任务对象(确保使用最新数据)
- task = ContainerWCSModel.objects.get(pk=task_id)
-
- # 发送第一个请求(不处理结果)
- requests.post(
- "http://127.0.0.1:8008/container/batch/",
- json=send_data,
- timeout=10
- )
-
- # 发送关键请求
- response = requests.post(
- "http://192.168.18.67:1616/wcs/WebApi/getOutTask",
- json=send_data,
- timeout=10
- )
-
- # 处理响应
- if response.status_code == 200:
- task.status = 200
- task.save()
- logger.info(f"任务 {task.taskid} 已发送")
- else:
- logger.error(f"WCS返回错误: {response.text}")
-
- except Exception as e:
- logger.error(f"发送失败: {str(e)}")
- finally:
- close_old_connections() # 清理数据库连接
- @staticmethod
- def create_initial_tasks(container_list,bound_list_id):
- """生成初始任务队列"""
- with transaction.atomic():
- current_WCS = ContainerWCSModel.objects.filter(tasktype='outbound',bound_list_id = bound_list_id).first()
- if current_WCS:
- logger.error(f"当前{bound_list_id}已有出库任务")
-
- return False
- tasks = []
- start_sequence = ContainerWCSModel.objects.filter(tasktype='outbound').count() + 1
- tasknumber = ContainerWCSModel.objects.filter().count()
- tasknumber_index = 1
- for index, container in enumerate(container_list, start=start_sequence):
- container_obj = ContainerListModel.objects.filter(id =container['container_number']).first()
- if container_obj.current_location != container_obj.target_location:
- logger.error(f"托盘 {container_obj.container_code} 未到达目的地,不生成任务")
- return False
- OutBoundDetail_obj = OutBoundDetailModel.objects.filter(bound_list=bound_list_id,bound_batch_number_id=container['batch_id']).first()
- if not OutBoundDetail_obj:
- logger.error(f"批次 {container['batch_id']} 不存在")
- return False
- month = int(timezone.now().strftime("%Y%m"))
- task = ContainerWCSModel(
- taskid=OutboundService.generate_task_id(),
- batch = OutBoundDetail_obj.bound_batch_number,
- batch_out = OutBoundDetail_obj.bound_batch,
- bound_list = OutBoundDetail_obj.bound_list,
- sequence=index,
- order_number = container['location_c_number'],
- priority=100,
- tasknumber = month*100000+tasknumber_index+tasknumber,
- container=container_obj.container_code,
- current_location=container_obj.current_location,
- target_location="203",
- tasktype="outbound",
- month=int(timezone.now().strftime("%Y%m")),
- message="等待出库",
- status=100,
- )
- tasknumber_index += 1
- tasks.append(task)
- ContainerWCSModel.objects.bulk_create(tasks)
- logger.info(f"已创建 {len(tasks)} 个初始任务")
- @staticmethod
- def insert_new_tasks(new_tasks):
- """动态插入新任务并重新排序"""
- with transaction.atomic():
- pending_tasks = list(ContainerWCSModel.objects.select_for_update().filter(status=100))
-
- # 插入新任务
- for new_task_data in new_tasks:
- new_task = ContainerWCSModel(
- taskid=OutboundService.generate_task_id(),
- priority=new_task_data.get('priority', 100),
- container=new_task_data['container'],
- current_location=new_task_data['current_location'],
- target_location=new_task_data.get('target_location', 'OUT01'),
- tasktype="outbound",
- month=int(timezone.now().strftime("%Y%m")),
- message="等待出库",
- status=100,
- )
- # 找到插入位置
- insert_pos = 0
- for i, task in enumerate(pending_tasks):
- if new_task.priority < task.priority:
- insert_pos = i
- break
- else:
- insert_pos = len(pending_tasks)
- pending_tasks.insert(insert_pos, new_task)
-
- # 重新分配顺序号
- for i, task in enumerate(pending_tasks, start=1):
- task.sequence = i
- if task.pk is None:
- task.save()
- else:
- task.save(update_fields=['sequence'])
-
- logger.info(f"已插入 {len(new_tasks)} 个新任务")
- @staticmethod
- def process_next_task():
- """处理下一个任务"""
- next_task = ContainerWCSModel.objects.filter(status=100).order_by('sequence').first()
- if not next_task:
- logger.info("没有待处理任务")
- return
- OutboundService.send_task_to_wcs(next_task)
- class OutTaskViewSet(viewsets.ModelViewSet):
- """
- # fun:get_out_task:下发出库任务
- # fun:get_batch_count_by_boundlist:获取出库申请下的批次数量
- # fun:generate_location_by_demand:根据出库需求生成出库任务
- """
- # authentication_classes = [] # 禁用所有认证类
- # permission_classes = [AllowAny] # 允许任意访问
- def get_out_task(self, request, *args, **kwargs):
- try:
- data = self.request.data
- logger.info(f"收到 WMS 推送数据: {data}")
-
- # 假设从请求中获取 bound_list_id
- bound_list_id = data.get('bound_list_id')
- batch_count = self.get_batch_count_by_boundlist(bound_list_id)
-
- logger.info(f"出库批次数量: {batch_count}")
- # 获取需要出库的托盘列表
- generate_result = self.generate_location_by_demand(batch_count)
- if generate_result['code'] != '200':
- return Response(generate_result, status=500)
- container_list = generate_result['data']
-
- logger.info(f"生成出库任务: {container_list}")
- # 2. 生成初始任务
- OutboundService.create_initial_tasks(container_list,bound_list_id)
-
- # 3. 立即发送第一个任务
- OutboundService.process_next_task()
-
- return Response({"code": "200", "msg": "Success"}, status=200)
- except Exception as e:
- logger.error(f"任务生成失败: {str(e)}")
- return Response({"code": "500", "msg": str(e)}, status=500)
- # 获取出库需求
- def get_batch_count_by_boundlist(self,bound_list_id):
- try:
- bound_list_obj_all = OutBoundDetailModel.objects.filter(bound_list=bound_list_id).all()
- if bound_list_obj_all:
-
- batch_count_dict = {}
- # 统计批次数量(创建哈希表,去重)
- for batch in bound_list_obj_all:
- if batch.bound_batch_number_id not in batch_count_dict:
- batch_count_dict[batch.bound_batch_number_id] = batch.bound_batch.goods_out_qty
- else:
- batch_count_dict[batch.bound_batch_number_id] += batch.bound_batch.goods_out_qty
- return batch_count_dict
- else:
- logger.error(f"查询批次数量失败: {bound_list_id} 不存在")
- return {}
- except Exception as e:
- logger.error(f"查询批次数量失败: {str(e)}")
- return {}
- def get_location_by_status_and_batch(self,status,bound_id):
- try:
- container_obj = ContainerDetailModel.objects.filter(batch=bound_id,status=status).all()
- if container_obj:
-
- container_dict = {}
- # 统计托盘数量(创建哈希表,去重)
- for obj in container_obj:
- if obj.container_id not in container_dict:
- container_dict[obj.container_id] = obj.goods_qty
- else:
- container_dict[obj.container_id] += obj.goods_qty
- return container_dict
- else:
- logger.error(f"查询{status}状态的批次数量失败: {bound_id} 不存在")
- return {}
- except Exception as e:
- logger.error(f"查询{status}状态的批次数量失败: {str(e)}")
- return {}
-
- def get_order_by_batch(self,container_list,bound_id):
- try:
- container_dict = {}
- for container in container_list:
- location_container = LocationContainerLink.objects.filter(container_id=container,is_active=True).first()
- if location_container:
- location_c_number = location_container.location.c_number
- if container not in container_dict:
- container_dict[container] = {
- "container_number":container,
- "location_c_number":location_c_number,
- "location_id ":location_container.location.id,
- "location_type":location_container.location.location_type,
- "batch_id":bound_id,
- }
- if len(container_dict.keys()) == len(container_list):
- return container_dict
- else:
- logger.error(f"查询批次数量失败: {container_list} 不存在")
- return {}
- except Exception as e:
- logger.error(f"查询批次数量失败: {str(e)}")
- return {}
-
- except Exception as e:
- logger.error(f"查询{status}状态的批次数量失败: {str(e)}")
- return {}
- def generate_location_by_demand(self,demand_list):
- # demand_list {1: 25, 2: 17}
- try:
- return_location =[]
- for demand_id, demand_qty in demand_list.items():
- container_list = self.get_location_by_status_and_batch(2, demand_id)
- if not container_list:
- return {"code": "500", "msg": f"批次 {demand_id} 不存在"}
- container_id_list = container_list.keys()
- container_order = self.get_order_by_batch(container_id_list,demand_id)
- if not container_order:
- return {"code": "500", "msg": f"托盘 {container_id_list} 不存在"}
- order = sorted(
- container_order.values(),
- key=lambda x: (
- int(x['location_type'][-1]), # 提取最后一位数字并转为整数
- -x['location_c_number'] # 按location_c_number降序
- )
- )
- current_qty = 0
- for container in order:
- container_detail_obj = ContainerDetailModel.objects.filter(container_id=container['container_number'],batch_id=demand_id,status=2).all()
- if not container_detail_obj:
- return {"code": "500", "msg": f"托盘上无该批次,请检查{container['container_number']} 不存在"}
- goods_qty = 0
- for obj in container_detail_obj:
- goods_qty += obj.goods_qty
- if current_qty < demand_qty:
- current_qty += goods_qty
- return_location.append(container)
- else:
- break
- return {"code": "200", "msg": "Success", "data": return_location}
- except Exception as e:
- return {"code": "500", "msg": str(e)}
-
- class BatchViewSet(viewsets.ModelViewSet):
- authentication_classes = [] # 禁用所有认证类
- permission_classes = [AllowAny] # 允许任意访问
- def wcs_post(self, request, *args, **kwargs):
- data = self.request.data
- logger.info(f"收到 WMS 推送数据: {data}")
- return Response({"code": "200", "msg": "Success"}, status=200)
|