from rest_framework import viewsets from utils.page import MyPageNumberPagination from utils.datasolve import sumOfList, transportation_calculate from utils.md5 import Md5 from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend from rest_framework.response import Response from rest_framework.exceptions import APIException from django.utils import timezone from django.db import transaction import logging from rest_framework import status from .models import DeviceModel,LocationModel,LocationGroupModel,LocationContainerLink,LocationChangeLog,alloction_pre from bound.models import BoundBatchModel,BoundDetailModel,BoundListModel from .filter import DeviceFilter,LocationFilter,LocationContainerLinkFilter,LocationChangeLogFilter,LocationGroupFilter from .serializers import LocationListSerializer,LocationPostSerializer from .serializers import LocationGroupListSerializer,LocationGroupPostSerializer # 以后添加模块时,只需要在这里添加即可 from rest_framework.permissions import AllowAny from container.models import ContainerListModel,ContainerDetailModel,ContainerOperationModel,TaskModel logger = logging.getLogger(__name__) # 库位分配 # 入库规则函数 # 逻辑根据批次下的托盘数目来找满足区间范围的库位,按照优先级排序, class locationViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = LocationFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return LocationModel.objects.filter() else: return LocationModel.objects.filter( id=id) else: return LocationModel.objects.none() def get_serializer_class(self): if self.action == 'list': return LocationListSerializer elif self.action == 'update': return LocationPostSerializer elif self.action =='retrieve': return LocationListSerializer def update(self, request, *args, **kwargs): qs = self.get_object() data = self.request.data location_code = data.get('location_code') # 处理库位对象 location_obj = LocationModel.objects.filter(location_code=location_code).first() if not location_obj: logger.info(f"库位 {location_code} 不存在") return Response( {'code': '400', 'message': '库位不存在', 'data': None}, status=status.HTTP_400_BAD_REQUEST ) else: data['id'] = location_obj.id logger.info(f"库位 {location_code} 已存在") serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) self.handle_group_location_status(location_code,location_obj.location_group) return Response(serializer.data, status=200, headers=headers) def handle_group_location_status(self,location_code,location_group): """ 处理库位组和库位的关联关系 :param location_code: 库位编码 :param location_group: 库位组编码 :return: """ # 1. 获取库位空闲状态的库位数目 location_obj_number = LocationModel.objects.filter( location_group=location_group, status='available' ).all().count() # 2. 获取库位组对象 logger.info(f"库位组 {location_group} 下的库位数目:{location_obj_number}") # 1. 获取库位和库位组的关联关系 location_group_obj = LocationGroupModel.objects.filter( group_code=location_group ).first() if not location_group_obj: logger.info(f"库位组 {location_group} 不存在") return None else: if location_obj_number == 0: # 库位组库位已满,更新库位组状态为full location_group_obj.status = 'full' location_group_obj.save() elif location_obj_number < location_group_obj.max_capacity: location_group_obj.status = 'occupied' location_group_obj.save() else: location_group_obj.status = 'available' location_group_obj.save() class locationGroupViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = LocationGroupFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return LocationGroupModel.objects.filter() else: return LocationGroupModel.objects.filter(id=id) else: return LocationGroupModel.objects.none() def get_serializer_class(self): if self.action == 'list': return LocationGroupListSerializer elif self.action == 'update': return LocationGroupPostSerializer elif self.action =='retrieve': return LocationGroupListSerializer def update(self, request, *args, **kwargs): data = self.request.data order_month = str(timezone.now().strftime('%Y%m')) data['month'] = order_month group_code = data.get('group_code') # 处理库位组对象 group_obj = LocationGroupModel.objects.filter(group_code=group_code).first() if group_obj: data['id'] = group_obj.id logger.info(f"库位组 {group_code} 已存在") else: logger.info(f"库位组 {group_code} 不存在,创建库位组对象") serializer_list = LocationGroupPostSerializer(data=data) serializer_list.is_valid(raise_exception=True) serializer_list.save() data['id'] = serializer_list.data.get('id') return Response(data, status=status.HTTP_201_CREATED) class LocationAllocation: # 入库规则函数 # fun:get_pallet_count_by_batch: 根据托盘码查询批次下托盘总数 # fun:get_left_locationGroup_number_by_type: 获取每层库位组剩余数量 # fun:get_location_type: 根据托盘数目获取库位类型 # fun:update_location_container_link: 更新库位和托盘的关联关系 # fun:update_location_group_batch: 更新库位组的批次 # fun:update_batch_status: 更新批次状态yes/no # fun:update_location_status: 更新库位状态和 # fun:up # fun:get_batch_status: 获取批次状态 # fun:get_batch: 获取批次 # fun:get_location_list_remainder: 获取可用库位的c_number列表 # fun:get_location_by_type_remainder: 根据库位类型获取库位 # fun:get_location_by_type: 第一次入库,根据库位类型获取库位 # fun:get_location_by_status: 根据库位状态获取库位 @transaction.atomic def get_pallet_count_by_batch(self, container_code): """ 根据托盘码查询批次下托盘总数 :param container_code: 要查询的托盘码 :return: 所属批次下的托盘总数 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id, status=1 ).first() if not container_detail: print (f"容器 {container_code} 未组盘") logger.error(f"容器 {container_code} 未组盘") return None else: print (f"容器 {container_code} 已组盘") batch_container = ContainerDetailModel.objects.filter( batch = container_detail.batch.id, status = 1 ).all() # 统计批次下的不同托盘 item.contianer_id batch_container_count = 0 container_ids = [] for item in batch_container: if item.container_id not in container_ids: batch_container_count = batch_container_count + 1 container_ids.append(item.container_id) batch_item = BoundBatchModel.objects.filter( bound_number = container_detail.batch.bound_number).first() if not batch_item: print(f"批次号获取失败!") return None batch_item.container_number = batch_container_count batch_item.save() return batch_container_count def get_left_locationGroup_number_by_type(self): """ 获取每层库位组剩余数量 :return: """ try: # 定义库位组和层号 group = ['T1', 'T2', 'S4', 'T4', 'T5'] layer = [1, 2, 3] # 初始化结果列表,包含三个空字典对应三个层 left_number = [{} for _ in layer] for item in group: for idx, layer_num in enumerate(layer): # 检查库位组是否存在(不考虑状态) exists = LocationGroupModel.objects.filter( group_type=item, layer=layer_num ).exists() if not exists: print(f"库位组 {item}_{layer_num} 不存在") left_number[idx][item] = 0 else: # 统计可用状态的库位组数量 count = LocationGroupModel.objects.filter( group_type=item, layer=layer_num, status='available' ).count() left_number[idx][item] = count return left_number except Exception as e: logger.error(f"获取库位组剩余数量失败:{str(e)}") print(f"获取库位组剩余数量失败:{str(e)}") return None def get_location_type(self,container_code): """ :param container_code: 托盘码 :return: 库位类型列表 """ batch = self.get_batch(container_code) if not batch: print(f"类型分配中批次号获取失败!") return None location_solution = alloction_pre.objects.filter(batch_number=batch).first() if not location_solution: # 1. 获取托盘码对应批次号下所有的托盘数目 container_number = self.get_pallet_count_by_batch(container_code) print (f"该批次下托盘总数:{container_number}") # 2. 计算每层剩余的库位数目 layer_number=self.get_pallet_count_by_batch() # 3. 查看最新库位分配方案 location_solution_last = alloction_pre.objects.filter().order_by('-id').first() if not location_solution_last: layer1_pressure =0 layer2_pressure =0 layer3_pressure =0 else: layer1_pressure = location_solution_last.layer1_pressure layer2_pressure = location_solution_last.layer2_pressure layer3_pressure = location_solution_last.layer3_pressure # 4. 根据工作压力和库位类型,使用贪心算法,(AGV只有两个库位,故只给考虑1 2 层工作压力,第三层仅作为1,2层的补充,不考虑压力)获取库位类型列表,完成任务之后更新压力/TODO:需要考虑库位类型和库位数量的关系 if layer1_pressure <=layer2_pressure: print(f"库位类型:{location_type}") # 5. 保存库位分配方案 alloction_pre.objects.create( batch_number=batch, layer1_pressure=layer1_pressure, layer2_pressure=layer2_pressure, layer3_pressure=layer3_pressure, location_type=','.join(location_type) ) return location_type else: print(f"库位类型:{location_solution.location_type}") return location_solution.location_type.split(",") @transaction.atomic def update_location_container_link(self,location_code,container_code): """ 更新库位和托盘的关联关系 :param location_code: 库位编码 :param container_code: 托盘编码 :return: """ try: # 1. 获取库位和托盘的关联关系 location = LocationModel.objects.filter( location_code=location_code ).first() container = ContainerListModel.objects.filter( container_code=container_code ).first() # 2. 如果库位和托盘的关联关系不存在,创建新的关联关系 if not LocationContainerLink.objects.filter(location=location).exists(): location_container_link = LocationContainerLink( location=location, container=container ) location_container_link.save() print(f"更新库位和托盘的关联关系成功!") return True # 3. 更新库位和托盘的关联关系 else: LocationContainerLink.objects.filter(location=location).update(location=location, container=container) print(f"更新库位和托盘的关联关系成功!") return True except Exception as e: logger.error(f"更新库位和托盘的关联关系失败:{str(e)}") print(f"更新库位和托盘的关联关系失败:{str(e)}") return False def update_location_group_batch(self,location,container_code): """ :param location: 库位对象 :param container_code: 托盘码 :return: """ try: # 1. 获取库位组 location_group = LocationGroupModel.objects.filter( group_code=location.location_group ).first() if not location_group: print(f"库位组获取失败!") return False # 2. 更新库位组的批次 bound_number=self.get_batch(container_code) if not bound_number: print(f"批次号获取失败!") return False location_group.current_batch = bound_number location_group.save() print(f"更新库位组的批次成功!") return True except Exception as e: logger.error(f"更新库位组的批次失败:{str(e)}") print(f"更新库位组的批次失败:{str(e)}") return False def update_location_status(self,location_code,status): """ 更新库位状态 :param location_code: 库位编码 :param status: 库位状态 :return: """ try: # 1. 获取库位 location = LocationModel.objects.filter( location_code=location_code ).first() if not location: print(f"库位获取失败!") return False # 2. 更新库位状态 location.status = status location.save() print(f"更新库位状态成功!") return True except Exception as e: logger.error(f"更新库位状态失败:{str(e)}") print(f"更新库位状态失败:{str(e)}") return False def update_location_group_status(self, location_code): """ 更新库位组状态 :param location_code: 库位编码 :return: """ try: # 1. 获取库位 location = LocationModel.objects.filter( location_code=location_code ).first() if not location: print(f"库位获取失败!") return False # 2. 获取库位组 location_group = LocationGroupModel.objects.filter( group_code=location.location_group ).first() if not location_group: print(f"库位组获取失败!") return False current=0 for location_item in location_group.location_items.all(): if location_item.status != 'available': current=current + 1 # 3. 更新库位组状态 if current == 0: location_group.status = 'available' elif current == location_group.max_capacity: location_group.status = 'full' else: location_group.status = 'occupied' location_group.current_goods_quantity = sum( [loc.current_quantity for loc in location_group.location_items.all()] ) location_group.current_quantity = current location_group.save() print(f"更新库位组状态成功!") return True except Exception as e: logger.error(f"更新库位组状态失败:{str(e)}") print(f"更新库位组状态失败:{str(e)}") def update_batch_status(self,container_code,status): """ 更新批次状态 :param batch_id: 批次id :param status: 批次状态 :return: """ try: # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id, status=1 ).first() if not container_detail: print (f"容器 {container_code} 未组盘") logger.error(f"容器 {container_code} 未组盘") return None else: print (f"容器 {container_code} 已组盘") # 3. 更新批次状态 batch = container_detail.batch batch.status = status batch.save() print(f"更新批次状态成功!") return True except Exception as e: logger.error(f"更新批次状态失败:{str(e)}") print(f"更新批次状态失败:{str(e)}") return False def get_batch_status(self,container_code): """ 获取批次状态 :param container_code: 托盘码 :return: 批次状态 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id, status=1 ).first() if not container_detail: print (f"容器 {container_code} 未组盘") logger.error(f"容器 {container_code} 未组盘") return None else: print (f"容器 {container_code} 已组盘") batch_status = container_detail.batch.status return batch_status def get_batch(self,container_code): """ 获取批次 :param container_code: 托盘码 :return: 批次 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id, status=1 ).first() if not container_detail: print (f"容器 {container_code} 未组盘") logger.error(f"容器 {container_code} 未组盘") return None else: print (f"容器 {container_code} 已组盘") batch = container_detail.batch.bound_number return batch @transaction.atomic def get_location_list_remainder(self, location_list): """ 获取可用库位的c_number列表 :param location_list: 库位对象列表 :return: 可用库位编号列表 """ if not location_list: return None min_c_number=1000 min_c_number_index=1000 for location in location_list: if location.status != 'available': continue if int(location.c_number)