services.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from django.db.models import Count, Q
  2. from django.utils import timezone
  3. from bin.models import LocationModel
  4. from .models import LocationStatistics, LocationGroupStatistics
  5. class LocationStatisticsService:
  6. """货位统计服务类"""
  7. TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1']
  8. @staticmethod
  9. def calculate_location_statistics(warehouse_code=None, layer=None):
  10. """
  11. 计算货位统计信息
  12. Args:
  13. warehouse_code: 仓库代码,如果为None则统计所有仓库
  14. layer: 楼层,如果为None则统计所有楼层
  15. """
  16. # 构建查询条件
  17. # 构建查询条件 - 只统计指定的货位类型
  18. filters = {
  19. 'is_active': True,
  20. 'location_type__in': LocationStatisticsService.TARGET_LOCATION_TYPES
  21. }
  22. if warehouse_code:
  23. filters['warehouse_code'] = warehouse_code
  24. if layer is not None:
  25. filters['layer'] = layer
  26. # 获取所有符合条件的货位
  27. locations = LocationModel.objects.filter(**filters)
  28. # 按仓库和楼层分组统计
  29. statistics_data = {}
  30. for location in locations:
  31. key = f"{location.warehouse_code}_{location.layer}"
  32. if key not in statistics_data:
  33. statistics_data[key] = {
  34. 'warehouse_code': location.warehouse_code,
  35. 'warehouse_name': location.warehouse_name,
  36. 'layer': location.layer,
  37. 'location_types': {},
  38. 'groups': {},
  39. 'total': 0,
  40. 'used': 0,
  41. 'available': 0
  42. }
  43. status = (location.status or '').lower()
  44. is_used = status in {'occupied', 'reserved'}
  45. is_available = status == 'available'
  46. # 统计货位类型
  47. loc_type = location.location_type
  48. if loc_type not in statistics_data[key]['location_types']:
  49. statistics_data[key]['location_types'][loc_type] = {
  50. 'total': 0, 'used': 0, 'available': 0
  51. }
  52. statistics_data[key]['location_types'][loc_type]['total'] += 1
  53. statistics_data[key]['total'] += 1
  54. # 根据状态统计
  55. if is_used:
  56. statistics_data[key]['location_types'][loc_type]['used'] += 1
  57. statistics_data[key]['used'] += 1
  58. elif is_available:
  59. statistics_data[key]['location_types'][loc_type]['available'] += 1
  60. statistics_data[key]['available'] += 1
  61. # 统计货位组
  62. group = location.location_group
  63. if group not in statistics_data[key]['groups']:
  64. statistics_data[key]['groups'][group] = {
  65. 'total': 0,
  66. 'used': 0,
  67. 'available': 0,
  68. 'types': {}
  69. }
  70. statistics_data[key]['groups'][group]['total'] += 1
  71. if is_used:
  72. statistics_data[key]['groups'][group]['used'] += 1
  73. elif is_available:
  74. statistics_data[key]['groups'][group]['available'] += 1
  75. # 统计组内货位类型分布
  76. group_types = statistics_data[key]['groups'][group]['types']
  77. if loc_type not in group_types:
  78. group_types[loc_type] = {'total': 0, 'used': 0, 'available': 0}
  79. group_types[loc_type]['total'] += 1
  80. if is_used:
  81. group_types[loc_type]['used'] += 1
  82. elif is_available:
  83. group_types[loc_type]['available'] += 1
  84. return statistics_data
  85. @staticmethod
  86. def save_statistics_to_db(statistics_data):
  87. """将统计结果保存到数据库"""
  88. current_time = timezone.now()
  89. # 先将之前的最新标记清除
  90. LocationStatistics.objects.filter(is_latest=True).update(is_latest=False)
  91. statistics_objects = []
  92. group_statistics_objects = []
  93. for key, data in statistics_data.items():
  94. # 计算使用率
  95. utilization_rate = 0
  96. if data['total'] > 0:
  97. utilization_rate = round((data['used'] / data['total']) * 100, 2)
  98. # 创建主统计对象
  99. stat = LocationStatistics(
  100. warehouse_code=data['warehouse_code'],
  101. warehouse_name=data['warehouse_name'],
  102. layer=data['layer'],
  103. statistic_time=current_time,
  104. is_latest=True,
  105. total_locations=data['total'],
  106. total_used=data['used'],
  107. total_available=data['available'],
  108. utilization_rate=utilization_rate
  109. )
  110. # 设置各类型货位统计
  111. loc_types = data['location_types']
  112. for loc_type, counts in loc_types.items():
  113. if loc_type == 'T5':
  114. stat.t5_total = counts['total']
  115. stat.t5_used = counts['used']
  116. stat.t5_available = counts['available']
  117. elif loc_type == 'T4':
  118. stat.t4_total = counts['total']
  119. stat.t4_used = counts['used']
  120. stat.t4_available = counts['available']
  121. elif loc_type == 'S4':
  122. stat.s4_total = counts['total']
  123. stat.s4_used = counts['used']
  124. stat.s4_available = counts['available']
  125. elif loc_type == 'T2':
  126. stat.t2_total = counts['total']
  127. stat.t2_used = counts['used']
  128. stat.t2_available = counts['available']
  129. elif loc_type == 'T1':
  130. stat.t1_total = counts['total']
  131. stat.t1_used = counts['used']
  132. stat.t1_available = counts['available']
  133. statistics_objects.append(stat)
  134. # 创建货位组统计对象
  135. for group_name, group_data in data['groups'].items():
  136. group_utilization = 0
  137. if group_data['total'] > 0:
  138. group_utilization = round((group_data['used'] / group_data['total']) * 100, 2)
  139. group_stat = LocationGroupStatistics(
  140. warehouse_code=data['warehouse_code'],
  141. warehouse_name=data['warehouse_name'],
  142. layer=data['layer'],
  143. location_group=group_name,
  144. total_locations=group_data['total'],
  145. used_locations=group_data['used'],
  146. available_locations=group_data['available'],
  147. utilization_rate=group_utilization,
  148. location_type_breakdown=group_data.get('types', {}),
  149. statistic_time=current_time
  150. )
  151. group_statistics_objects.append(group_stat)
  152. # 批量保存
  153. LocationStatistics.objects.bulk_create(statistics_objects)
  154. LocationGroupStatistics.objects.bulk_create(group_statistics_objects)
  155. return len(statistics_objects)
  156. @staticmethod
  157. def get_latest_statistics(warehouse_code=None, layer=None):
  158. """获取最新统计信息"""
  159. filters = {'is_latest': True}
  160. if warehouse_code:
  161. filters['warehouse_code'] = warehouse_code
  162. if layer is not None:
  163. filters['layer'] = layer
  164. return LocationStatistics.objects.filter(**filters).order_by('warehouse_code', 'layer')
  165. @staticmethod
  166. def get_statistics_by_time_range(start_time, end_time, warehouse_code=None, layer=None):
  167. """按时间范围获取统计信息"""
  168. filters = {
  169. 'statistic_time__gte': start_time,
  170. 'statistic_time__lte': end_time
  171. }
  172. if warehouse_code:
  173. filters['warehouse_code'] = warehouse_code
  174. if layer is not None:
  175. filters['layer'] = layer
  176. return LocationStatistics.objects.filter(**filters).order_by('statistic_time', 'warehouse_code', 'layer')