from django.db import transaction from .models import * from container.models import * import logging logger = logging.getLogger(__name__) class LocationUpdates: """ 库位相关更新 functions: update_location_status(location_code, status): 更新库位状态 link_container(location_code, container_code): 关联托盘 update_batch_status(container_code, status): 更新批次状态 save_allocation_plan(batch_number, solution, pressure): 保存分配方案 """ @transaction.atomic def link_container(location_code, container_code): try: location = LocationModel.objects.select_for_update().get( location_code=location_code ) container = ContainerListModel.objects.get( container_code=container_code ) link, created = LocationContainerLink.objects.update_or_create( location=location, defaults={'container': container, 'is_active': True} ) return True except Exception as e: raise RuntimeError(f"关联更新失败: {str(e)}") @transaction.atomic def update_batch_status(container_code, status): try: container = ContainerListModel.objects.get( container_code=container_code ) detail = ContainerDetailModel.objects.select_related('batch').filter( container=container.id, is_delete=False, status__in=[1, 2] ).all() if not detail: print(f"托盘 {container_code} 未组盘_from update_batch_status") for item in detail: item.batch.status = status item.batch.save() return True except Exception as e: raise RuntimeError(f"批次状态更新失败: {str(e)}") @transaction.atomic def save_allocation_plan(batch_number, solution, pressure): with transaction.atomic(): base_location.objects.update_or_create( id=1, defaults={ 'layer1_pressure': pressure[0], 'layer2_pressure': pressure[1], 'layer3_pressure': pressure[2] } ) alloction_pre.objects.update_or_create( batch_number=batch_number, defaults={'layer_pre_type': solution} ) @staticmethod def update_pallet_count(batch_container_count,bound_number): batch_item = BoundBatchModel.objects.filter(bound_number = bound_number).first() if not batch_item: print(f"批次号获取失败!") return None batch_item.container_number = batch_container_count batch_item.save() return True @staticmethod def update_group_status_reserved(location_group_list): """ 更新库位组状态 :param location_group_list: 库位组对象列表 :return: """ try: for location_group in location_group_list: # 1. 获取库位组 if not location_group: print(f"库位组获取失败!") return False # 2. 更新库位组状态 location_group_id = location_group.split('_')[1] location_group_item = LocationGroupModel.objects.filter( id=location_group_id ).first() if not location_group_item: print(f"库位组 {location_group} 不存在") return False # 3. 更新库位组状态 location_group_item.status = 'reserved' location_group_item.save() return True except Exception as e: logger.error(f"更新库位组状态失败:{str(e)}") print(f"更新库位组状态失败:{str(e)}") return False def update_current_finish_task(container,task_finish_number): """ 更新当前完成任务数 :param container: 托盘号 :param task_finish_number: 完成任务数 :return: Boolean """ from .queries import LocationQueries batch = LocationQueries.get_batch_info(container).get('number') if not batch: return None solution = alloction_pre.objects.filter(batch_number=batch).first() if not solution: return None solution.layer1_task_finish_number = task_finish_number[0] solution.layer2_task_finish_number = task_finish_number[1] solution.layer3_task_finish_number = task_finish_number[2] solution.save() return True @staticmethod @transaction.atomic def update_location_group_status( location_code): """ 更新库位组状态 :param location_code: 库位编码 :return: """ try: # 1. 获取库位 location = LocationModel.objects.filter( location_code=location_code ).first() if not location: print(f"库位获取失败!") return False # 2. 获取库位组 location_group = LocationGroupModel.objects.filter( group_code=location.location_group ).first() if not location_group: print(f"库位组获取失败!") return False current=0 for location_item in location_group.location_items.all(): if location_item.status != 'available': current=current + 1 # 3. 更新库位组状态 if current == 0: location_group.status = 'available' elif current == location_group.max_capacity: location_group.status = 'full' else: location_group.status = 'occupied' location_group.current_goods_quantity = sum( [loc.current_quantity for loc in location_group.location_items.all()] ) location_group.current_quantity = current location_group.save() return True except Exception as e: logger.error(f"更新库位组状态失败:{str(e)}") print(f"更新库位组状态失败:{str(e)}") @staticmethod @transaction.atomic def update_location_group_batch(location,bound_number): """ :param location: 库位对象 :param :return: """ try: # 1. 获取库位组 location_group = LocationGroupModel.objects.filter( group_code=location.location_group ).first() if not location_group: print(f"库位组获取失败!") return False # 2. 更新库位组的批次 location_group.current_batch = bound_number location_group.save() print(f"更新库位组的批次成功!") return True except Exception as e: logger.error(f"更新库位组的批次失败:{str(e)}") print(f"更新库位组的批次失败:{str(e)}") return False def update_container_detail_status(container_code,status): try: # 1. 获取托盘 container = ContainerListModel.objects.filter( container_code=container_code ).first() if not container: print(f"托盘 {container_code} 不存在") return False # 2. 更新托盘状态 container_detail = ContainerDetailModel.objects.filter( container=container.id,is_delete=False ).exclude(status=3).all() if not container_detail: print(f"托盘 {container_code} 未组盘_from update_container_detail_status") return True for detail in container_detail: detail.status = status detail.save() return True except Exception as e: logger.error(f"更新托盘状态失败:{str(e)}") print(f"更新托盘状态失败:{str(e)}") return False