from django.db.models import Count, Q from django.utils import timezone from bin.models import LocationModel from .models import LocationStatistics, LocationGroupStatistics class LocationStatisticsService: """货位统计服务类""" TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1'] @staticmethod def calculate_location_statistics(warehouse_code=None, layer=None): """ 计算货位统计信息 Args: warehouse_code: 仓库代码,如果为None则统计所有仓库 layer: 楼层,如果为None则统计所有楼层 """ # 构建查询条件 # 构建查询条件 - 只统计指定的货位类型 filters = { 'is_active': True, 'location_type__in': LocationStatisticsService.TARGET_LOCATION_TYPES } if warehouse_code: filters['warehouse_code'] = warehouse_code if layer is not None: filters['layer'] = layer # 获取所有符合条件的货位 locations = LocationModel.objects.filter(**filters) # 按仓库和楼层分组统计 statistics_data = {} for location in locations: key = f"{location.warehouse_code}_{location.layer}" if key not in statistics_data: statistics_data[key] = { 'warehouse_code': location.warehouse_code, 'warehouse_name': location.warehouse_name, 'layer': location.layer, 'location_types': {}, 'groups': {}, 'total': 0, 'used': 0, 'available': 0 } status = (location.status or '').lower() is_used = status in {'occupied', 'reserved'} is_available = status == 'available' # 统计货位类型 loc_type = location.location_type if loc_type not in statistics_data[key]['location_types']: statistics_data[key]['location_types'][loc_type] = { 'total': 0, 'used': 0, 'available': 0 } statistics_data[key]['location_types'][loc_type]['total'] += 1 statistics_data[key]['total'] += 1 # 根据状态统计 if is_used: statistics_data[key]['location_types'][loc_type]['used'] += 1 statistics_data[key]['used'] += 1 elif is_available: statistics_data[key]['location_types'][loc_type]['available'] += 1 statistics_data[key]['available'] += 1 # 统计货位组 group = location.location_group if group not in statistics_data[key]['groups']: statistics_data[key]['groups'][group] = { 'total': 0, 'used': 0, 'available': 0, 'types': {} } statistics_data[key]['groups'][group]['total'] += 1 if is_used: statistics_data[key]['groups'][group]['used'] += 1 elif is_available: statistics_data[key]['groups'][group]['available'] += 1 # 统计组内货位类型分布 group_types = statistics_data[key]['groups'][group]['types'] if loc_type not in group_types: group_types[loc_type] = {'total': 0, 'used': 0, 'available': 0} group_types[loc_type]['total'] += 1 if is_used: group_types[loc_type]['used'] += 1 elif is_available: group_types[loc_type]['available'] += 1 return statistics_data @staticmethod def save_statistics_to_db(statistics_data): """将统计结果保存到数据库""" current_time = timezone.now() # 先将之前的最新标记清除 LocationStatistics.objects.filter(is_latest=True).update(is_latest=False) statistics_objects = [] group_statistics_objects = [] for key, data in statistics_data.items(): # 计算使用率 utilization_rate = 0 if data['total'] > 0: utilization_rate = round((data['used'] / data['total']) * 100, 2) # 创建主统计对象 stat = LocationStatistics( warehouse_code=data['warehouse_code'], warehouse_name=data['warehouse_name'], layer=data['layer'], statistic_time=current_time, is_latest=True, total_locations=data['total'], total_used=data['used'], total_available=data['available'], utilization_rate=utilization_rate ) # 设置各类型货位统计 loc_types = data['location_types'] for loc_type, counts in loc_types.items(): if loc_type == 'T5': stat.t5_total = counts['total'] stat.t5_used = counts['used'] stat.t5_available = counts['available'] elif loc_type == 'T4': stat.t4_total = counts['total'] stat.t4_used = counts['used'] stat.t4_available = counts['available'] elif loc_type == 'S4': stat.s4_total = counts['total'] stat.s4_used = counts['used'] stat.s4_available = counts['available'] elif loc_type == 'T2': stat.t2_total = counts['total'] stat.t2_used = counts['used'] stat.t2_available = counts['available'] elif loc_type == 'T1': stat.t1_total = counts['total'] stat.t1_used = counts['used'] stat.t1_available = counts['available'] statistics_objects.append(stat) # 创建货位组统计对象 for group_name, group_data in data['groups'].items(): group_utilization = 0 if group_data['total'] > 0: group_utilization = round((group_data['used'] / group_data['total']) * 100, 2) group_stat = LocationGroupStatistics( warehouse_code=data['warehouse_code'], warehouse_name=data['warehouse_name'], layer=data['layer'], location_group=group_name, total_locations=group_data['total'], used_locations=group_data['used'], available_locations=group_data['available'], utilization_rate=group_utilization, location_type_breakdown=group_data.get('types', {}), statistic_time=current_time ) group_statistics_objects.append(group_stat) # 批量保存 LocationStatistics.objects.bulk_create(statistics_objects) LocationGroupStatistics.objects.bulk_create(group_statistics_objects) return len(statistics_objects) @staticmethod def get_latest_statistics(warehouse_code=None, layer=None): """获取最新统计信息""" filters = {'is_latest': True} if warehouse_code: filters['warehouse_code'] = warehouse_code if layer is not None: filters['layer'] = layer return LocationStatistics.objects.filter(**filters).order_by('warehouse_code', 'layer') @staticmethod def get_statistics_by_time_range(start_time, end_time, warehouse_code=None, layer=None): """按时间范围获取统计信息""" filters = { 'statistic_time__gte': start_time, 'statistic_time__lte': end_time } if warehouse_code: filters['warehouse_code'] = warehouse_code if layer is not None: filters['layer'] = layer return LocationStatistics.objects.filter(**filters).order_by('statistic_time', 'warehouse_code', 'layer')