from django.db import models from .models import * from django.db.models import Q import json from django.db import transaction from container.models import * from .models import LocationGroupModel import logging from collections import defaultdict logger = logging.getLogger(__name__) class LocationQueries: """" 库位相关查询 functions: get_container(container_code): 获取托盘信息 params: container_code: 托盘编号 return: ContainerListModel get_active_container_details(container_id): 获取托盘详情 params: container_id: 托盘ID return: ContainerDetailModel get_active_container_details(container_id): 获取托盘详情 params: container_id: 托盘ID return: ContainerDetailModel get_batch_info(container_code): 获取托盘批次信息 params: container_code: 托盘编号 return: dict get_group_capacity(): 获取库位组空闲容量 return: list get_current_pressure(): 获取仓库当前工作压力 return: list get_pallet_count(container_code): 获取托盘数量 params: container_code: 托盘编号 return: int """ @staticmethod def get_container(container_code): return ContainerListModel.objects.filter(container_code=container_code).first() @staticmethod def get_active_container_details(container_id): return ContainerDetailModel.objects.filter( container=container_id,is_delete=False ).exclude(status=3).first() @staticmethod def get_batch_info(container_code): container = ContainerListModel.objects.filter( container_code=container_code ).first() detail = ContainerDetailModel.objects.filter( container=container.id,is_delete=False ).exclude(status=3).first() if not detail: return { 'status': None, 'number': None, 'container': container, 'class': 2 , } elif detail.batch is None: return { 'status': None, 'number': None, 'container': container, 'class': 2, } else: detail_all = ContainerDetailModel.objects.filter( container=container.id,is_delete=False ).exclude(status=3).all() # 统计该拖盘上的不同批次数目 batch_count = len(set([item.batch.id for item in detail_all])) if batch_count > 1: for item in detail_all: item.goods_class = 3 item.save() return { 'status': None, 'number': None, 'container': container, 'class': 3, } else: return { 'status': detail.batch.status , 'number': detail.batch.bound_number , 'container': container, 'class': 1, } @staticmethod def get_group_capacity(): groups = ['T1', 'T2', 'S4', 'T4', 'T5'] capacity = [] for layer in [1, 2, 3]: layer_data = {} for group in groups: count = LocationGroupModel.objects.filter( group_type=group, layer=layer, status='available' ).count() layer_data[group] = count capacity.append(layer_data) return capacity @staticmethod def get_current_pressure(): pressure = base_location.objects.first() if not pressure: pressure = base_location.objects.create() return [pressure.layer1_pressure, pressure.layer2_pressure, pressure.layer3_pressure] @staticmethod def get_pallet_count(container_code): """ 获取托盘数量 :param container_code: 要查询的托盘码 :return: 所属批次下的托盘总数 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") return None container_detail = ContainerDetailModel.objects.filter( container=container.id,is_delete=False ).exclude(status = 3).first() batch_obj = container_detail.batch return batch_obj.container_number @staticmethod def get_pallet_count_by_batch(container_code): """ 根据托盘码查询批次下托盘总数 :param container_code: 要查询的托盘码 :return: 所属批次下的托盘总数 """ # 1. 通过托盘码获取容器详情 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: logger.error(f"托盘 {container_code} 不存在") return None container_detail = ContainerDetailModel.objects.filter( container=container.id,is_delete=False ).exclude(status = 3).first() if not container_detail: logger.error(f"托盘 {container_code} 未组盘") return None batch_container = ContainerDetailModel.objects.filter( batch = container_detail.batch.id,is_delete=False ).all().exclude(status = 3) # 统计批次下的不同托盘 item.contianer_id batch_container_count = 0 container_ids = [] for item in batch_container: if item.container_id not in container_ids: batch_container_count = batch_container_count + 1 container_ids.append(item.container_id) batch_container_scatter = ContainerDetailModel.objects.filter( batch = container_detail.batch.id,is_delete=False,goods_class=3 ).all().exclude(status = 3) batch_container_scatter_count = 0 container_scatter_ids = [] for item in batch_container_scatter: if item.container_id not in container_scatter_ids: batch_container_scatter_count = batch_container_scatter_count + 1 container_scatter_ids.append(item.container_id) return batch_container_count,batch_container_scatter_count @staticmethod def get_current_finish_task(container,batch_info): batch = batch_info.get('number') if not batch: return None solution = alloction_pre.objects.filter(batch_number=batch).first() if not solution: return [0,0,0] return [solution.layer1_task_finish_number,solution.layer2_task_finish_number,solution.layer3_task_finish_number] @staticmethod def divide_solution_by_layer(data): # 统计所有存在的层级 layer_counts = defaultdict(lambda: defaultdict(int)) existing_layers = set() for item in data: # 分割层级和类型 try: layer, loc_type = item.split('_') layer_num = int(layer) existing_layers.add(layer_num) layer_counts[layer_num][loc_type] += 1 except (ValueError, IndexError): continue # 跳过无效格式的数据 # 确定最大层级(至少包含1层) max_layer = max(existing_layers) if existing_layers else 1 # 构建包含所有层级的最终结果 final_result = {} for layer in range(1, max_layer + 1): final_result[str(layer)] = dict(layer_counts.get(layer, {})) return json.dumps(final_result, indent=2) @staticmethod def get_alloction_pre_solution(batch_number): """ 获取指定批次的预留方案 :param batch_number: 批次号 :return: 预留方案 """ solution = alloction_pre.objects.filter(batch_number=batch_number).first() if not solution: return None return solution.layer_solution_type @staticmethod def get_left_location_group(batch_number): """ 获取指定批次的剩余库位 :param batch_number: 批次号 :return: 剩余库位 x ('available', '可用'), ('occupied', '占用'), x ('full', '满'), x ('disabled', '禁用'), ('reserved', '预留'), x ('maintenance', '维护中') """ layer_solution_type =[] location_group_obj = LocationGroupModel.objects.filter(current_batch=batch_number).exclude(status__in=['available', 'full', 'disabled', 'maintenance']) if not location_group_obj: return None else: for item in location_group_obj: layer_solution_type.append(f"{item.layer}_{item.id}") return layer_solution_type