| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 | from django.db import transactionfrom .models import *from container.models import *import logginglogger = logging.getLogger(__name__)class LocationUpdates:    """    库位相关更新     functions:        update_location_status(location_code, status): 更新库位状态        link_container(location_code, container_code): 关联托盘        update_batch_status(container_code, status): 更新批次状态        save_allocation_plan(batch_number, solution, pressure): 保存分配方案    """    def link_container(location_code, container_code):        try:            location = LocationModel.objects.get(                location_code=location_code            )            container = ContainerListModel.objects.get(                container_code=container_code            )                        link, created = LocationContainerLink.objects.update_or_create(                location=location,                defaults={'container': container, 'is_active': True}            )            return True        except Exception as e:            raise RuntimeError(f"关联更新失败: {str(e)}")            def disable_link_container(location_code, container_code):        try:            location = LocationModel.objects.get(                location_code=location_code            )            container = ContainerListModel.objects.get(                container_code=container_code            )            link = LocationContainerLink.objects.filter(                location=location,                container=container            ).first()            if link:                link.is_active = False                link.save()            return True        except Exception as e:                   logger.error(f"关联更新失败:{str(e)}")            return False    def update_batch_status(container_code, status):        try:            container = ContainerListModel.objects.get(                container_code=container_code            )            detail = ContainerDetailModel.objects.select_related('batch').filter(                container=container.id,                is_delete=False,                status__in=[1, 2]                ).all()            if not detail:                print(f"托盘 {container_code} 未组盘_from update_batch_status")            for item in detail:                item.batch.status = status                item.batch.save()            return True        except Exception as e:            raise RuntimeError(f"批次状态更新失败: {str(e)}")    def save_allocation_plan(batch_number, solution, pressure):           base_location.objects.update_or_create(            id=1,            defaults={                'layer1_pressure': pressure[0],                'layer2_pressure': pressure[1],                'layer3_pressure': pressure[2]            }        )                alloction_pre.objects.update_or_create(            batch_number=batch_number,            defaults={'layer_pre_type': solution}        )    @staticmethod    def update_pallet_count(batch_container_count,bound_number):        batch_item = BoundBatchModel.objects.filter(bound_number = bound_number).first()        if not batch_item:            print(f"批次号获取失败!")            return None        batch_item.container_number = batch_container_count        batch_item.save()        return True        @staticmethod    def update_group_status_reserved(location_group_list):        """        更新库位组状态        :param location_group_list: 库位组对象列表        :return:        """        try:            for location_group in location_group_list:                # 1. 获取库位组                if not location_group:                    print(f"库位组获取失败!")                    return False                # 2. 更新库位组状态                location_group_id = location_group.split('_')[1]                location_group_item = LocationGroupModel.objects.filter(                    id=location_group_id                ).first()                if not location_group_item:                    print(f"库位组 {location_group} 不存在")                    return False                # 3. 更新库位组状态                location_group_item.status = 'reserved'                location_group_item.save()            return True        except Exception as e:                   logger.error(f"更新库位组状态失败:{str(e)}")            print(f"更新库位组状态失败:{str(e)}")            return False                def update_current_finish_task(container,task_finish_number):        """        更新当前完成任务数        :param container: 托盘号        :param task_finish_number: 完成任务数        :return: Boolean        """        from .queries import LocationQueries        batch = LocationQueries.get_batch_info(container).get('number')         if not batch:            return None        solution = alloction_pre.objects.filter(batch_number=batch).first()        if not solution:            return None        solution.layer1_task_finish_number = task_finish_number[0]        solution.layer2_task_finish_number = task_finish_number[1]        solution.layer3_task_finish_number = task_finish_number[2]        solution.save()        return True        @staticmethod    def update_location_group_status(location_code):        """        更新库位组状态        :param location_code: 库位编码        :return:        """        try:            # 1. 获取库位            location = LocationModel.objects.filter(                location_code=location_code            ).first()            if not location:                print(f"库位获取失败!")                return False                        # 2. 获取库位组            location_group = LocationGroupModel.objects.filter(                group_code=location.location_group            ).first()            if not location_group:                print(f"库位组获取失败!")                return False            current=0            for location_item in location_group.location_items.all():                if location_item.status != 'available':                    current=current + 1            # 3. 更新库位组状态            if current == 0:                location_group.status = 'available'                location_group.current_batch = None            elif current == location_group.max_capacity:                location_group.status = 'full'            else:                location_group.status = 'occupied'                        location_group.current_goods_quantity = sum(                    [loc.current_quantity for loc in location_group.location_items.all()]                )            location_group.current_quantity = current                            location_group.save()                        return True        except Exception as e:                   logger.error(f"更新库位组状态失败:{str(e)}")            print(f"更新库位组状态失败:{str(e)}")    @staticmethod    def update_location_group_batch(location,bound_number):        """        :param location: 库位对象        :param         :return:        """        try:            # 1. 获取库位组            location_group = LocationGroupModel.objects.filter(                group_code=location.location_group            ).first()            if not location_group:                print(f"库位组获取失败!")                return False            # 2. 更新库位组的批次            location_group.current_batch = bound_number            location_group.save()            print(f"更新库位组的批次成功!")            return True        except Exception as e:                   logger.error(f"更新库位组的批次失败:{str(e)}")            print(f"更新库位组的批次失败:{str(e)}")            return False            def update_container_detail_status(container_code,status):        try:            # 1. 获取托盘            container = ContainerListModel.objects.filter(                container_code=container_code            ).first()            if not container:                print(f"托盘 {container_code} 不存在")                return False            # 2. 更新托盘状态            container_detail = ContainerDetailModel.objects.filter(                container=container.id,is_delete=False            ).exclude(status=3).all()            if not container_detail:                print(f"托盘 {container_code} 未组盘_from update_container_detail_status")                return True            for detail in container_detail:                detail.status = status                detail.save()            return True        except Exception as e:                   logger.error(f"更新托盘状态失败:{str(e)}")            print(f"更新托盘状态失败:{str(e)}")            return False
 |