123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- from django.db import models
- from .models import *
- from django.db.models import Q
- import json
- from django.db import transaction
- from container.models import *
- from .models import LocationGroupModel
- import logging
- from collections import defaultdict
- logger = logging.getLogger(__name__)
- class LocationQueries:
- """"
- 库位相关查询
- functions:
- get_container(container_code): 获取托盘信息
- params: container_code: 托盘编号
- return: ContainerListModel
- get_active_container_details(container_id): 获取托盘详情
- params: container_id: 托盘ID
- return: ContainerDetailModel
- get_active_container_details(container_id): 获取托盘详情
- params: container_id: 托盘ID
- return: ContainerDetailModel
- get_batch_info(container_code): 获取托盘批次信息
- params: container_code: 托盘编号
- return: dict
- get_group_capacity(): 获取库位组空闲容量
- return: list
- get_current_pressure(): 获取仓库当前工作压力
- return: list
- get_pallet_count(container_code): 获取托盘数量
- params: container_code: 托盘编号
- return: int
- """
- @staticmethod
- def get_container(container_code):
- return ContainerListModel.objects.filter(container_code=container_code).first()
- @staticmethod
- def get_active_container_details(container_id):
- return ContainerDetailModel.objects.filter(
- container=container_id,is_delete=False
- ).exclude(status=3).first()
- @staticmethod
- def get_batch_info(container_code):
- container = ContainerListModel.objects.filter(
- container_code=container_code
- ).first()
- detail = ContainerDetailModel.objects.filter(
- container=container.id,is_delete=False
- ).exclude(status=3).first()
- if not detail:
- return {
- 'status': None,
- 'number': None,
- 'container': container,
- 'class': 2 ,
- }
- elif detail.batch is None:
- return {
- 'status': None,
- 'number': None,
- 'container': container,
- 'class': 2,
- }
- else:
- detail_all = ContainerDetailModel.objects.filter(
- container=container.id,is_delete=False
- ).exclude(status=3).all()
- # 统计该拖盘上的不同批次数目
- batch_count = len(set([item.batch.id for item in detail_all]))
- if batch_count > 1:
- for item in detail_all:
- item.goods_class = 3
- item.save()
- return {
- 'status': None,
- 'number': None,
- 'container': container,
- 'class': 3,
- }
- else:
- return {
- 'status': detail.batch.status ,
- 'number': detail.batch.bound_number ,
- 'container': container,
- 'class': 1,
- }
- @staticmethod
- def get_group_capacity():
- groups = ['T1', 'T2', 'S4', 'T4', 'T5']
- capacity = []
- for layer in [1, 2, 3]:
- layer_data = {}
- for group in groups:
- count = LocationGroupModel.objects.filter(
- group_type=group,
- layer=layer,
- status='available'
- ).count()
- layer_data[group] = count
- capacity.append(layer_data)
- return capacity
- @staticmethod
- def get_current_pressure():
- pressure = base_location.objects.first()
- if not pressure:
- pressure = base_location.objects.create()
- return [pressure.layer1_pressure, pressure.layer2_pressure, pressure.layer3_pressure]
-
- @staticmethod
- def get_pallet_count(container_code):
- """
- 获取托盘数量
- :param container_code: 要查询的托盘码
- :return: 所属批次下的托盘总数
- """
- # 1. 通过托盘码获取容器详情
- container = ContainerListModel.objects.filter(
- container_code=container_code
- ).first()
- if not container:
- logger.error(f"托盘 {container_code} 不存在")
- return None
-
- container_detail = ContainerDetailModel.objects.filter(
- container=container.id,is_delete=False
- ).exclude(status = 3).first()
- batch_obj = container_detail.batch
- return batch_obj.container_number
-
- @staticmethod
- def get_pallet_count_by_batch(container_code):
- """
- 根据托盘码查询批次下托盘总数
- :param container_code: 要查询的托盘码
- :return: 所属批次下的托盘总数
- """
- # 1. 通过托盘码获取容器详情
- container = ContainerListModel.objects.filter(
- container_code=container_code
- ).first()
- if not container:
- logger.error(f"托盘 {container_code} 不存在")
- return None
-
- container_detail = ContainerDetailModel.objects.filter(
- container=container.id,is_delete=False
- ).exclude(status = 3).first()
- if not container_detail:
- logger.error(f"托盘 {container_code} 未组盘")
- return None
-
- batch_container = ContainerDetailModel.objects.filter(
- batch = container_detail.batch.id,is_delete=False
- ).all().exclude(status = 3)
- # 统计批次下的不同托盘 item.contianer_id
- batch_container_count = 0
- container_ids = []
- for item in batch_container:
- if item.container_id not in container_ids:
- batch_container_count = batch_container_count + 1
- container_ids.append(item.container_id)
- batch_container_scatter = ContainerDetailModel.objects.filter(
- batch = container_detail.batch.id,is_delete=False,goods_class=3
- ).all().exclude(status = 3)
- batch_container_scatter_count = 0
- container_scatter_ids = []
- for item in batch_container_scatter:
- if item.container_id not in container_scatter_ids:
- batch_container_scatter_count = batch_container_scatter_count + 1
- container_scatter_ids.append(item.container_id)
- return batch_container_count,batch_container_scatter_count
-
- @staticmethod
- def get_current_finish_task(container,batch_info):
- batch = batch_info.get('number')
- if not batch:
- return None
- solution = alloction_pre.objects.filter(batch_number=batch).first()
- if not solution:
- return [0,0,0]
- return [solution.layer1_task_finish_number,solution.layer2_task_finish_number,solution.layer3_task_finish_number]
- @staticmethod
- def divide_solution_by_layer(data):
-
- # 统计所有存在的层级
- layer_counts = defaultdict(lambda: defaultdict(int))
- existing_layers = set()
-
- for item in data:
- # 分割层级和类型
- try:
- layer, loc_type = item.split('_')
- layer_num = int(layer)
- existing_layers.add(layer_num)
- layer_counts[layer_num][loc_type] += 1
- except (ValueError, IndexError):
- continue # 跳过无效格式的数据
- # 确定最大层级(至少包含1层)
- max_layer = max(existing_layers) if existing_layers else 1
- # 构建包含所有层级的最终结果
- final_result = {}
- for layer in range(1, max_layer + 1):
- final_result[str(layer)] = dict(layer_counts.get(layer, {}))
-
- return json.dumps(final_result, indent=2)
-
- @staticmethod
- def get_alloction_pre_solution(batch_number):
- """
- 获取指定批次的预留方案
- :param batch_number: 批次号
- :return: 预留方案
- """
- solution = alloction_pre.objects.filter(batch_number=batch_number).first()
- if not solution:
- return None
- return solution.layer_solution_type
-
- @staticmethod
- def get_left_location_group(batch_number):
- """
- 获取指定批次的剩余库位
- :param batch_number: 批次号
- :return: 剩余库位
- x ('available', '可用'),
- ('occupied', '占用'),
- x ('full', '满'),
- x ('disabled', '禁用'),
- ('reserved', '预留'),
- x ('maintenance', '维护中')
- """
- layer_solution_type =[]
- location_group_obj =LocationGroupModel.objects.filter(current_batch=batch_number).all().exclude(status=['available','full','disabled','maintenance'])
- if not location_group_obj:
- return None
- else:
- for item in location_group_obj:
- layer_solution_type.append(f"{item.layer}_{item.id}")
- return layer_solution_type
|