from rest_framework import viewsets from utils.page import MyPageNumberPagination from utils.datasolve import sumOfList, transportation_calculate from utils.md5 import Md5 from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend from rest_framework.response import Response from rest_framework.exceptions import APIException from django.utils import timezone from django.db import transaction import logging from rest_framework import status from .models import DeviceModel,LocationModel,LocationGroupModel,LocationContainerLink,LocationChangeLog,alloction_pre,base_location from bound.models import BoundBatchModel,BoundDetailModel,BoundListModel from .filter import DeviceFilter,LocationFilter,LocationContainerLinkFilter,LocationChangeLogFilter,LocationGroupFilter from .serializers import LocationListSerializer,LocationPostSerializer from .serializers import LocationGroupListSerializer,LocationGroupPostSerializer # 以后添加模块时,只需要在这里添加即可 from rest_framework.permissions import AllowAny from container.models import ContainerListModel,ContainerDetailModel,ContainerOperationModel,TaskModel from django.db.models import Prefetch import copy import json from collections import defaultdict logger = logging.getLogger(__name__) # 库位分配 # 入库规则函数 # 逻辑根据批次下的托盘数目来找满足区间范围的库位,按照优先级排序, class locationViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = LocationFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() # 预取激活的关联托盘(通过中间模型过滤) prefetch_containers = Prefetch( 'current_containers', queryset=ContainerListModel.objects.filter( locationcontainerlink__is_active=True # 确保中间模型字段名正确 ).distinct(), to_attr='active_containers' # 将结果存储到模型的 active_containers 属性 ) if self.request.user: if id is None: return LocationModel.objects.prefetch_related(prefetch_containers).all() else: return LocationModel.objects.prefetch_related(prefetch_containers).filter(id=id) else: return LocationModel.objects.none() def get_serializer_class(self): if self.action == 'list': return LocationListSerializer elif self.action == 'update': return LocationPostSerializer elif self.action =='retrieve': return LocationListSerializer def update(self, request, *args, **kwargs): qs = self.get_object() data = self.request.data location_code = data.get('location_code') # 处理库位对象 location_obj = LocationModel.objects.filter(location_code=location_code).first() if not location_obj: logger.info(f"库位 {location_code} 不存在") return Response( {'code': '400', 'message': '库位不存在', 'data': None}, status=status.HTTP_400_BAD_REQUEST ) else: data['id'] = location_obj.id logger.info(f"库位 {location_code} 已存在") serializer = self.get_serializer(qs, data=data) serializer.is_valid(raise_exception=True) serializer.save() headers = self.get_success_headers(serializer.data) self.handle_group_location_status(location_code,location_obj.location_group) return Response(serializer.data, status=200, headers=headers) def handle_group_location_status(self,location_code,location_group): """ 处理库位组和库位的关联关系 :param location_code: 库位编码 :param location_group: 库位组编码 :return: """ # 1. 获取库位空闲状态的库位数目 location_obj_number = LocationModel.objects.filter( location_group=location_group, status='available' ).all().count() # 2. 获取库位组对象 logger.info(f"库位组 {location_group} 下的库位数目:{location_obj_number}") # 1. 获取库位和库位组的关联关系 location_group_obj = LocationGroupModel.objects.filter( group_code=location_group ).first() if not location_group_obj: logger.info(f"库位组 {location_group} 不存在") return None else: if location_obj_number == 0: # 库位组库位已满,更新库位组状态为full location_group_obj.status = 'full' location_group_obj.save() elif location_obj_number < location_group_obj.max_capacity: location_group_obj.status = 'occupied' location_group_obj.save() else: location_group_obj.status = 'available' location_group_obj.save() class locationGroupViewSet(viewsets.ModelViewSet): """ retrieve: Response a data list(get) list: Response a data list(all) create: Create a data line(post) delete: Delete a data line(delete) """ # authentication_classes = [] # 禁用所有认证类 # permission_classes = [AllowAny] # 允许任意访问 pagination_class = MyPageNumberPagination filter_backends = [DjangoFilterBackend, OrderingFilter, ] ordering_fields = ['id', "create_time", "update_time", ] filter_class = LocationGroupFilter def get_project(self): try: id = self.kwargs.get('pk') return id except: return None def get_queryset(self): id = self.get_project() if self.request.user: if id is None: return LocationGroupModel.objects.filter() else: return LocationGroupModel.objects.filter(id=id) else: return LocationGroupModel.objects.none() def get_serializer_class(self): if self.action == 'list': return LocationGroupListSerializer elif self.action == 'update': return LocationGroupPostSerializer elif self.action =='retrieve': return LocationGroupListSerializer def update(self, request, *args, **kwargs): data = self.request.data order_month = str(timezone.now().strftime('%Y%m')) data['month'] = order_month group_code = data.get('group_code') # 处理库位组对象 group_obj = LocationGroupModel.objects.filter(group_code=group_code).first() if group_obj: data['id'] = group_obj.id logger.info(f"库位组 {group_code} 已存在") else: logger.info(f"库位组 {group_code} 不存在,创建库位组对象") serializer_list = LocationGroupPostSerializer(data=data) serializer_list.is_valid(raise_exception=True) serializer_list.save() data['id'] = serializer_list.data.get('id') return Response(data, status=status.HTTP_201_CREATED) class LocationAllocation: # 入库规则函数 # fun:get_pallet_count_by_batch: 根据托盘码查询批次下托盘总数 # fun:get_left_locationGroup_number_by_type: 获取每层库位组剩余数量 # fun:get_location_type: 根据托盘数目获取库位类型 # fun:update_location_container_link: 更新库位和托盘的关联关系 # fun:update_location_group_batch: 更新库位组的批次 # fun:update_batch_status: 更新批次状态yes/no # fun:update_location_status: 更新库位状态和 # fun:up # fun:get_batch_status: 获取批次状态 # fun:get_batch: 获取批次 # fun:get_location_list_remainder: 获取可用库位的c_number列表 # fun # fun:get_location_by_type_remainder: 根据库位类型获取库位 # fun:get_location_by_type: 第一次入库,根据库位类型获取库位 # fun:get_location_by_status: 根据库位状态获取库位 @transaction.atomic def get_pallet_count_by_batch(self, container_code): """ 根据托盘码查询批次下托盘总数 :param container_code: 要查询的托盘码 :return: 所属批次下的托盘总数 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id ).exclude(status = 3).first() if not container_detail: logger.error(f"托盘 {container_code} 未组盘") return None batch_container = ContainerDetailModel.objects.filter( batch = container_detail.batch.id, status = 1 ).all() # 统计批次下的不同托盘 item.contianer_id batch_container_count = 0 container_ids = [] for item in batch_container: if item.container_id not in container_ids: batch_container_count = batch_container_count + 1 container_ids.append(item.container_id) batch_item = BoundBatchModel.objects.filter( bound_number = container_detail.batch.bound_number).first() if not batch_item: print(f"批次号获取失败!") return None batch_item.container_number = batch_container_count batch_item.save() return batch_container_count def get_left_locationGroup_number_by_type(self): """ 获取每层库位组剩余数量 :return: """ try: # 定义库位组和层号 group = ['T1', 'T2', 'S4', 'T4', 'T5'] layer = [1, 2, 3] # 初始化结果列表,包含三个空字典对应三个层 left_number = [{} for _ in layer] for item in group: for idx, layer_num in enumerate(layer): # 检查库位组是否存在(不考虑状态) exists = LocationGroupModel.objects.filter( group_type=item, layer=layer_num ).exists() if not exists: print(f"库位组 {item}_{layer_num} 不存在") left_number[idx][item] = 0 else: # 统计可用状态的库位组数量 count = LocationGroupModel.objects.filter( group_type=item, layer=layer_num, status='available' ).count() left_number[idx][item] = count return left_number except Exception as e: logger.error(f"获取库位组剩余数量失败:{str(e)}") print(f"获取库位组剩余数量失败:{str(e)}") return None @transaction.atomic def update_location_container_link(self,location_code,container_code): """ 更新库位和托盘的关联关系 :param location_code: 库位编码 :param container_code: 托盘编码 :return: """ try: # 1. 获取库位和托盘的关联关系 location = LocationModel.objects.filter( location_code=location_code ).first() container = ContainerListModel.objects.filter( container_code=container_code ).first() # 2. 如果库位和托盘的关联关系不存在,创建新的关联关系 if not LocationContainerLink.objects.filter(location=location).exists(): location_container_link = LocationContainerLink( location=location, container=container ) location_container_link.save() print(f"更新库位和托盘的关联关系成功!") return True # 3. 更新库位和托盘的关联关系 else: LocationContainerLink.objects.filter(location=location).update(location=location, container=container) print(f"更新库位和托盘的关联关系成功!") return True except Exception as e: logger.error(f"更新库位和托盘的关联关系失败:{str(e)}") print(f"更新库位和托盘的关联关系失败:{str(e)}") return False def update_container_detail_status(self,container_code,status): try: # 1. 获取托盘 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: print(f"托盘 {container_code} 不存在") return False # 2. 更新托盘状态 container_detail = ContainerDetailModel.objects.filter( container=container.id ).exclude(status=3).first() if not container_detail: print(f"托盘 {container_code} 未组盘_from update_container_detail_status") return False container_detail.status = status container_detail.save() print(f"更新托盘状态成功!") return True except Exception as e: logger.error(f"更新托盘状态失败:{str(e)}") print(f"更新托盘状态失败:{str(e)}") return False def update_location_group_batch(self,location,container_code): """ :param location: 库位对象 :param container_code: 托盘码 :return: """ try: # 1. 获取库位组 location_group = LocationGroupModel.objects.filter( group_code=location.location_group ).first() if not location_group: print(f"库位组获取失败!") return False # 2. 更新库位组的批次 bound_number=self.get_batch(container_code) if not bound_number: print(f"批次号获取失败!") return False location_group.current_batch = bound_number location_group.save() print(f"更新库位组的批次成功!") return True except Exception as e: logger.error(f"更新库位组的批次失败:{str(e)}") print(f"更新库位组的批次失败:{str(e)}") return False def update_location_status(self,location_code,status): """ 更新库位状态 :param location_code: 库位编码 :param status: 库位状态 :return: """ try: # 1. 获取库位 location = LocationModel.objects.filter( location_code=location_code ).first() if not location: print(f"库位获取失败!") return False # 2. 更新库位状态 location.status = status location.save() print(f"更新库位状态成功!") return True except Exception as e: logger.error(f"更新库位状态失败:{str(e)}") print(f"更新库位状态失败:{str(e)}") return False def update_group_status_reserved(self,location_group_list): """ 更新库位组状态 :param location_group_list: 库位组对象列表 :return: """ try: for location_group in location_group_list: # 1. 获取库位组 if not location_group: print(f"库位组获取失败!") return False # 2. 更新库位组状态 location_group_id = location_group.split('_')[1] location_group_item = LocationGroupModel.objects.filter( id=location_group_id ).first() if not location_group_item: print(f"库位组 {location_group} 不存在") return False # 3. 更新库位组状态 location_group_item.status = 'reserved' location_group_item.save() return True except Exception as e: logger.error(f"更新库位组状态失败:{str(e)}") print(f"更新库位组状态失败:{str(e)}") return False @transaction.atomic def update_location_group_status(self, location_code): """ 更新库位组状态 :param location_code: 库位编码 :return: """ try: # 1. 获取库位 location = LocationModel.objects.filter( location_code=location_code ).first() if not location: print(f"库位获取失败!") return False # 2. 获取库位组 location_group = LocationGroupModel.objects.filter( group_code=location.location_group ).first() if not location_group: print(f"库位组获取失败!") return False current=0 for location_item in location_group.location_items.all(): if location_item.status != 'available': current=current + 1 # 3. 更新库位组状态 if current == 0: location_group.status = 'available' elif current == location_group.max_capacity: location_group.status = 'full' else: location_group.status = 'occupied' location_group.current_goods_quantity = sum( [loc.current_quantity for loc in location_group.location_items.all()] ) location_group.current_quantity = current location_group.save() print(f"更新库位组状态成功!") return True except Exception as e: logger.error(f"更新库位组状态失败:{str(e)}") print(f"更新库位组状态失败:{str(e)}") def update_batch_status(self,container_code,status): """ 更新批次状态 :param batch_id: 批次id :param status: 批次状态 :return: """ try: # 1. 通过托盘码获取托盘详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id ).exclude(status=3).first() if not container_detail: print (f"托盘 {container_code} 未组盘") logger.error(f"托盘 {container_code} 未组盘_from update_batch_status") return None # 3. 更新批次状态 batch = container_detail.batch batch.status = status batch.save() print(f"更新批次状态成功!") return True except Exception as e: logger.error(f"更新批次状态失败:{str(e)}") print(f"更新批次状态失败:{str(e)}") return False def get_batch_status(self,container_code): """ 获取批次状态 :param container_code: 托盘码 :return: 批次状态 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id ).exclude(status=3).first() if not container_detail: print (f"托盘 {container_code} 未组盘") logger.error(f"托盘 {container_code} 未组盘_from get_batch_status") return None batch_status = container_detail.batch.status return batch_status def get_batch(self,container_code): """ 获取批次 :param container_code: 托盘码 :return: 批次 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") print(f"托盘 {container_code} 不存在") return None # 2. 获取关联的批次明细 container_detail = ContainerDetailModel.objects.filter( container=container.id ).exclude(status=3).first() if not container_detail: print (f"托盘 {container_code} 未组盘") logger.error(f"托盘 {container_code} 未组盘_from get_batch") return None batch = container_detail.batch.bound_number return batch @transaction.atomic def get_location_list_remainder(self, location_group_list,container_code): """ 获取可用库位的c_number列表 :param location_list: 库位对象列表 :return: 可用库位编号列表 """ if not location_group_list: return None min_c_number=1000 min_c_number_index=1000 current_task = self.get_current_finish_task(container_code) print(f"[1]当前已完成任务: {current_task}") # 按压力排序 sorted_pressure = sorted( [(0, current_task[0]), (1, current_task[1]), (2, current_task[2])], key=lambda x: (-x[1], x[0]) ) # 交换第一和第二个元素的位置 sorted_pressure[0], sorted_pressure[1] = sorted_pressure[1], sorted_pressure[0] print(f"[2]任务排序: {sorted_pressure}") print(f"[3]当前选择:{sorted_pressure[0][0]+1}") location_type_dict = json.loads(self.divide_solution_by_layer(location_group_list)) # print(f"库位类型分配方案: {location_type_dict}") for layer, _ in sorted_pressure: if not location_type_dict.get(str(layer+1)): continue # print(f"当前层: {layer+1}") # print(f"当前层库位组: {location_type_dict[str(layer+1)].keys()}") for group_id in location_type_dict[str(layer+1)].keys(): location_group = LocationGroupModel.objects.filter( id=group_id, ).first() if not location_group: continue location_list = location_group.location_items.filter( status='available' ).all().order_by('c_number') if not location_list: print(f"当前层库位组 {location_group.group_code} 可用库位: None") continue # 提取所有库位的 c_number c_numbers = [loc.c_number for loc in location_list] print(f"当前层库位组 {location_group.group_code} 可用库位: {c_numbers}") # 更新任务完成数目 current_task[layer] = current_task[layer] + 1 self.update_current_finish_task(container_code,current_task) return location_list[0] @transaction.atomic def get_location_type(self, container_code): """ 智能库位分配核心算法 :param container_code: 托盘码 :return: 库位类型分配方案 """ try: batch = self.get_batch(container_code) if not batch: logger.error("批次信息获取失败") return None # 检查已有分配方案 existing_solution = alloction_pre.objects.filter(batch_number=batch).first() if existing_solution: return existing_solution.layer_pre_type # 获取关键参数 total_pallets = self.get_pallet_count_by_batch(container_code) layer_capacity = self.get_left_locationGroup_number_by_type() current_pressure = self.get_current_pressure() # 测试参数 # total_pallets = 30 # layer_capacity = [{'T1': 29, 'T2': 14, 'S4': 10, 'T4': 27, 'T5': 27}, {'T1': 0, 'T2': 0, 'S4': 0, 'T4': 0, 'T5': 21}, {'T1': 29, 'T2': 14, 'S4': 10, 'T4': 27, 'T5': 27}] # current_pressure = [1,0,0] print(f"[1]托盘数目: {total_pallets}") print(f"[2]层容量: {layer_capacity}") # print(f"[3]当前压力: {current_pressure}") # 定义库位容量表 LOCATION_CAPACITY = {'T1':1, 'T2':2, 'T4':4, 'S4':4, 'T5':5} def allocate(remain, path, pressure,real_pressure,layer_capacity_state, depth=0): # 终止条件 if remain <= 0: return [path,real_pressure] # 深拷贝当前层容量状态 new_layer_capacity = copy.deepcopy(layer_capacity_state) # print(f"[2]当前剩余: {new_layer_capacity}") # 压力平衡系数 balance_factor = 1.0 - (0.1 * min(depth, 5)) # 层选择策略 print (f"[3]当前压力: {pressure}") layer_priority = sorted( [(0, pressure[0]), (1, pressure[1]), (2, pressure[2])], key=lambda x: (x[1] * balance_factor, x[0]) ) for layer, _ in layer_priority: # 生成候选库位类型(按效率和容量排序) # 排序键函数 : # min(x[1], remain) 计算当前库位类型的容量 c 和剩余数量 remain 中的较小值。 # -min(x[1], remain) 和 -x[1] 都使用了负号,这意味着排序是按降序进行的。 # 首先按 -min(x[1], remain) 排序,即优先选择容量与剩余数量更接近的库位类型。 # 如果有多个库位类型的容量与剩余数量相同,则按 -x[1] 排序,即优先选择容量更大的库位类型。 print(f"[4]当前层: {layer+1}, 剩余: {remain}, 容量状态: {new_layer_capacity[layer]}") candidates = sorted( [(t, c) for t, c in LOCATION_CAPACITY.items() if new_layer_capacity[layer].get(t,0) > 0], key=lambda x: (abs(x[1]-remain), -x[1]) ) print(f"[4]候选库位类型: {candidates}") for loc_type, cap in candidates: # 更新容量状态 updated_capacity = copy.deepcopy(new_layer_capacity) updated_capacity[layer][loc_type] -= 1 # 占用一个库位组 # 允许适度空间浪费(当剩余<2时) # effective_cap = min(cap, remain) if (cap - remain) < 2 else cap effective_cap = min(cap, remain) if effective_cap <= remain: new_remain = remain - effective_cap new_pressure = pressure.copy() for i in range(0, 3): new_pressure[i] -=1 if new_pressure[i] > 0 else 0 new_pressure[layer] += effective_cap # 按实际存放数计算压力,此时别的楼层压力可能降下来了 real_pressure[layer] += effective_cap # 实际压力 result = allocate( new_remain, path + [f"{layer+1}_{loc_type}"], new_pressure, real_pressure, updated_capacity, depth + 1 ) if result: print (f"[5]分配方案: {result}") return result return None # 执行分配 allocation = allocate(total_pallets, [], [current_pressure[0], current_pressure[1],current_pressure[2]],[current_pressure[0], current_pressure[1],current_pressure[2]], layer_capacity) if not allocation: logger.error("无法生成有效分配方案") return None # 保存分配方案 allocation_json = self.divide_solution_by_layer(allocation[0]) print(f"[6]分配方案: {allocation_json}") solution = alloction_pre( batch_number=batch, layer_pre_type =allocation_json ) solution_pressure, created = base_location.objects.get_or_create( id=1, defaults={ 'layer1_pressure': 0, 'layer2_pressure': 0, 'layer3_pressure': 0 } ) solution_pressure.layer1_pressure = allocation[1][0] solution_pressure.layer2_pressure = allocation[1][1] solution_pressure.layer3_pressure = allocation[1][2] solution.save() solution_pressure.save() return [loc.split('_')[1] for loc in allocation[0]] except Exception as e: logger.error(f"分配算法异常:{str(e)}") return None def divide_solution_by_layer(self, data): # 统计所有存在的层级 layer_counts = defaultdict(lambda: defaultdict(int)) existing_layers = set() for item in data: # 分割层级和类型 try: layer, loc_type = item.split('_') layer_num = int(layer) existing_layers.add(layer_num) layer_counts[layer_num][loc_type] += 1 except (ValueError, IndexError): continue # 跳过无效格式的数据 # 确定最大层级(至少包含1层) max_layer = max(existing_layers) if existing_layers else 1 # 构建包含所有层级的最终结果 final_result = {} for layer in range(1, max_layer + 1): final_result[str(layer)] = dict(layer_counts.get(layer, {})) return json.dumps(final_result, indent=2) def get_current_pressure(self): """获取实时工作压力""" last_solution = base_location.objects.order_by('-id').first() if not last_solution: base_location.objects.create( layer1_pressure=0, layer2_pressure=0, layer3_pressure=0, ).save() return [ last_solution.layer1_pressure if last_solution else 0, last_solution.layer2_pressure if last_solution else 0, last_solution.layer3_pressure if last_solution else 0, ] def get_current_finish_task(self,container): batch = self.get_batch(container) if not batch: return None solution = alloction_pre.objects.filter(batch_number=batch).first() if not solution: return None return [solution.layer1_task_finish_number,solution.layer2_task_finish_number,solution.layer3_task_finish_number] def update_current_finish_task(self,container,task_finish_number): batch = self.get_batch(container) if not batch: return None solution = alloction_pre.objects.filter(batch_number=batch).first() if not solution: return None solution.layer1_task_finish_number = task_finish_number[0] solution.layer2_task_finish_number = task_finish_number[1] solution.layer3_task_finish_number = task_finish_number[2] solution.save() return True @transaction.atomic def get_location_by_type(self, location_type_list, start_location, container_code): """ 根据库位类型获取库位,先根据工作压力找出最空闲的层,看下这层有没有工作,如果有,就从里面找,如果没有,则跳过该层,继续找下一层 :param location_type_list: 库位分配方案 { "1": { "S4": 1 }, "2": {}, "3": { "T5": 1 } } :param start_location: 起始位置(决定优先级排序方式) :return: 符合条件的库位列表 """ locations = [] # 检查已有分配方案 existing_solution = alloction_pre.objects.filter(batch_number=self.get_batch(container_code)).first() if existing_solution.layer_solution_type: print(f"[0]已有库位分配方案:{existing_solution.layer_solution_type}") return existing_solution.layer_solution_type for layer, location_type_dict in location_type_list.items(): if not location_type_dict: continue # 获取库位类型列表 location_type = list(location_type_dict.keys()) demand_number = sum(location_type_dict.values()) print (f"[1]层{layer} 需求数量: {demand_number}, 库位: {location_type}") location_groups = LocationGroupModel.objects.filter( group_type__in=location_type, layer=layer, status='available' ) if not location_groups: print(f"层{layer} 无库位") # 根据起始位置选择排序字段 if start_location == '203': ordered_groups = location_groups.order_by('left_priority') elif start_location == '103': ordered_groups = location_groups.order_by('right_priority') else: ordered_groups = location_groups.none() number = 0 for location_group in ordered_groups: if number >= demand_number: break locations.append(f"{layer}_{location_group.id}") number += 1 existing_solution.layer_solution_type = locations existing_solution.save() print(f"[2]分配方案: {locations}") return locations if locations else None @transaction.atomic def get_location_by_status(self,container_code,start_location): """ 根据库位状态获取库位 :param location_type: 库位类型 :param start_location: 起始库位 if in1 优先考虑left_priority, if in2 优先考虑right_priority 就是获取库位组列表之后进行排序 :return: 库位列表 """ # 1. 获取批次状态 1 为已组盘 2 为部分入库 3 为全部入库 status = self.get_batch_status(container_code) # if status == 1: # 2. 获取库位组 print(f"[1]第一次入库") # 重新获取最新数据 self.get_location_type(container_code) location_type_list = json.loads(alloction_pre.objects.filter(batch_number=self.get_batch(container_code)).first().layer_pre_type) location_list = self.get_location_by_type(location_type_list,start_location,container_code) # 预定这些库组 self.update_group_status_reserved(location_list) location_min_value = self.get_location_list_remainder(location_list,container_code) print(f"库位安排到第{location_min_value.c_number}个库位:{location_min_value}") # if not location_list[location_min_index]: # # 库位已满,返回None # return None # else: # return location_list[location_min_index] return location_min_value elif status == 2: # 3. 获取部分入库库位 print (f"部分入库") location_list = alloction_pre.objects.filter(batch_number=self.get_batch(container_code)).first().layer_solution_type location_min_value = self.get_location_list_remainder(location_list,container_code) print(f"库位安排到第{location_min_value.c_number}个库位:{location_min_value}") return location_min_value def release_location(self, location_code): """释放库位并更新关联数据""" try: location = LocationModel.objects.get(location_code=location_code) links = LocationContainerLink.objects.get(location=location, is_active=True) print(f"释放库位: {location_code}, 关联容器: {links.container_id}") # 解除关联并标记为非活跃 links.is_active = False links.save() return True except Exception as e: logger.error(f"释放库位失败: {str(e)}") return False