| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- from django.db.models import Count, Q
- from django.utils import timezone
- from bin.models import LocationModel
- from .models import LocationStatistics, LocationGroupStatistics
- class LocationStatisticsService:
- """货位统计服务类"""
- TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1']
- @staticmethod
- def calculate_location_statistics(warehouse_code=None, layer=None):
- """
- 计算货位统计信息
-
- Args:
- warehouse_code: 仓库代码,如果为None则统计所有仓库
- layer: 楼层,如果为None则统计所有楼层
- """
- # 构建查询条件
- # 构建查询条件 - 只统计指定的货位类型
- filters = {
- 'is_active': True,
- 'location_type__in': LocationStatisticsService.TARGET_LOCATION_TYPES
- }
- if warehouse_code:
- filters['warehouse_code'] = warehouse_code
- if layer is not None:
- filters['layer'] = layer
-
- # 获取所有符合条件的货位
- locations = LocationModel.objects.filter(**filters)
-
- # 按仓库和楼层分组统计
- statistics_data = {}
-
- for location in locations:
- key = f"{location.warehouse_code}_{location.layer}"
- if key not in statistics_data:
- statistics_data[key] = {
- 'warehouse_code': location.warehouse_code,
- 'warehouse_name': location.warehouse_name,
- 'layer': location.layer,
- 'location_types': {},
- 'groups': {},
- 'total': 0,
- 'used': 0,
- 'available': 0
- }
- status = (location.status or '').lower()
- is_used = status in {'occupied', 'reserved'}
- is_available = status == 'available'
- # 统计货位类型
- loc_type = location.location_type
- if loc_type not in statistics_data[key]['location_types']:
- statistics_data[key]['location_types'][loc_type] = {
- 'total': 0, 'used': 0, 'available': 0
- }
- statistics_data[key]['location_types'][loc_type]['total'] += 1
- statistics_data[key]['total'] += 1
- # 根据状态统计
- if is_used:
- statistics_data[key]['location_types'][loc_type]['used'] += 1
- statistics_data[key]['used'] += 1
- elif is_available:
- statistics_data[key]['location_types'][loc_type]['available'] += 1
- statistics_data[key]['available'] += 1
- # 统计货位组
- group = location.location_group
- if group not in statistics_data[key]['groups']:
- statistics_data[key]['groups'][group] = {
- 'total': 0,
- 'used': 0,
- 'available': 0,
- 'types': {}
- }
- statistics_data[key]['groups'][group]['total'] += 1
- if is_used:
- statistics_data[key]['groups'][group]['used'] += 1
- elif is_available:
- statistics_data[key]['groups'][group]['available'] += 1
- # 统计组内货位类型分布
- group_types = statistics_data[key]['groups'][group]['types']
- if loc_type not in group_types:
- group_types[loc_type] = {'total': 0, 'used': 0, 'available': 0}
- group_types[loc_type]['total'] += 1
- if is_used:
- group_types[loc_type]['used'] += 1
- elif is_available:
- group_types[loc_type]['available'] += 1
-
- return statistics_data
-
- @staticmethod
- def save_statistics_to_db(statistics_data):
- """将统计结果保存到数据库"""
- current_time = timezone.now()
-
- # 先将之前的最新标记清除
- LocationStatistics.objects.filter(is_latest=True).update(is_latest=False)
-
- statistics_objects = []
- group_statistics_objects = []
-
- for key, data in statistics_data.items():
- # 计算使用率
- utilization_rate = 0
- if data['total'] > 0:
- utilization_rate = round((data['used'] / data['total']) * 100, 2)
-
- # 创建主统计对象
- stat = LocationStatistics(
- warehouse_code=data['warehouse_code'],
- warehouse_name=data['warehouse_name'],
- layer=data['layer'],
- statistic_time=current_time,
- is_latest=True,
- total_locations=data['total'],
- total_used=data['used'],
- total_available=data['available'],
- utilization_rate=utilization_rate
- )
-
- # 设置各类型货位统计
- loc_types = data['location_types']
- for loc_type, counts in loc_types.items():
- if loc_type == 'T5':
- stat.t5_total = counts['total']
- stat.t5_used = counts['used']
- stat.t5_available = counts['available']
- elif loc_type == 'T4':
- stat.t4_total = counts['total']
- stat.t4_used = counts['used']
- stat.t4_available = counts['available']
- elif loc_type == 'S4':
- stat.s4_total = counts['total']
- stat.s4_used = counts['used']
- stat.s4_available = counts['available']
- elif loc_type == 'T2':
- stat.t2_total = counts['total']
- stat.t2_used = counts['used']
- stat.t2_available = counts['available']
- elif loc_type == 'T1':
- stat.t1_total = counts['total']
- stat.t1_used = counts['used']
- stat.t1_available = counts['available']
-
- statistics_objects.append(stat)
-
- # 创建货位组统计对象
- for group_name, group_data in data['groups'].items():
- group_utilization = 0
- if group_data['total'] > 0:
- group_utilization = round((group_data['used'] / group_data['total']) * 100, 2)
-
- group_stat = LocationGroupStatistics(
- warehouse_code=data['warehouse_code'],
- warehouse_name=data['warehouse_name'],
- layer=data['layer'],
- location_group=group_name,
- total_locations=group_data['total'],
- used_locations=group_data['used'],
- available_locations=group_data['available'],
- utilization_rate=group_utilization,
- location_type_breakdown=group_data.get('types', {}),
- statistic_time=current_time
- )
- group_statistics_objects.append(group_stat)
-
- # 批量保存
- LocationStatistics.objects.bulk_create(statistics_objects)
- LocationGroupStatistics.objects.bulk_create(group_statistics_objects)
-
- return len(statistics_objects)
-
- @staticmethod
- def get_latest_statistics(warehouse_code=None, layer=None):
- """获取最新统计信息"""
- filters = {'is_latest': True}
- if warehouse_code:
- filters['warehouse_code'] = warehouse_code
- if layer is not None:
- filters['layer'] = layer
-
- return LocationStatistics.objects.filter(**filters).order_by('warehouse_code', 'layer')
-
- @staticmethod
- def get_statistics_by_time_range(start_time, end_time, warehouse_code=None, layer=None):
- """按时间范围获取统计信息"""
- filters = {
- 'statistic_time__gte': start_time,
- 'statistic_time__lte': end_time
- }
- if warehouse_code:
- filters['warehouse_code'] = warehouse_code
- if layer is not None:
- filters['layer'] = layer
-
- return LocationStatistics.objects.filter(**filters).order_by('statistic_time', 'warehouse_code', 'layer')
|