services.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. from django.db.models import Count, Q
  2. from django.utils import timezone
  3. from bin.models import LocationModel
  4. from .models import LocationStatistics, LocationGroupStatistics
  5. class LocationStatisticsService:
  6. """货位统计服务类"""
  7. TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1']
  8. @staticmethod
  9. def calculate_location_statistics(warehouse_code=None, layer=None):
  10. """
  11. 计算货位统计信息
  12. Args:
  13. warehouse_code: 仓库代码,如果为None则统计所有仓库
  14. layer: 楼层,如果为None则统计所有楼层
  15. """
  16. # 构建查询条件
  17. # 构建查询条件 - 只统计指定的货位类型
  18. filters = {
  19. 'is_active': True,
  20. 'location_type__in': LocationStatisticsService.TARGET_LOCATION_TYPES
  21. }
  22. if warehouse_code:
  23. filters['warehouse_code'] = warehouse_code
  24. if layer is not None:
  25. filters['layer'] = layer
  26. # 获取所有符合条件的货位
  27. locations = LocationModel.objects.filter(**filters)
  28. # 按仓库和楼层分组统计
  29. statistics_data = {}
  30. for location in locations:
  31. key = f"{location.warehouse_code}_{location.layer}"
  32. if key not in statistics_data:
  33. statistics_data[key] = {
  34. 'warehouse_code': location.warehouse_code,
  35. 'warehouse_name': location.warehouse_name,
  36. 'layer': location.layer,
  37. 'location_types': {},
  38. 'groups': {},
  39. 'total': 0,
  40. 'used': 0,
  41. 'available': 0
  42. }
  43. # 统计货位类型
  44. loc_type = location.location_type
  45. if loc_type not in statistics_data[key]['location_types']:
  46. statistics_data[key]['location_types'][loc_type] = {
  47. 'total': 0, 'used': 0, 'available': 0
  48. }
  49. statistics_data[key]['location_types'][loc_type]['total'] += 1
  50. statistics_data[key]['total'] += 1
  51. # 根据状态统计
  52. if location.status in ['occupied', 'reserved']:
  53. statistics_data[key]['location_types'][loc_type]['used'] += 1
  54. statistics_data[key]['used'] += 1
  55. elif location.status == 'available':
  56. statistics_data[key]['location_types'][loc_type]['available'] += 1
  57. statistics_data[key]['available'] += 1
  58. # 统计货位组
  59. group = location.location_group
  60. if group not in statistics_data[key]['groups']:
  61. statistics_data[key]['groups'][group] = {
  62. 'total': 0, 'used': 0, 'available': 0
  63. }
  64. statistics_data[key]['groups'][group]['total'] += 1
  65. if location.status == 'occupied':
  66. statistics_data[key]['groups'][group]['used'] += 1
  67. elif location.status == 'available':
  68. statistics_data[key]['groups'][group]['available'] += 1
  69. return statistics_data
  70. @staticmethod
  71. def save_statistics_to_db(statistics_data):
  72. """将统计结果保存到数据库"""
  73. current_time = timezone.now()
  74. # 先将之前的最新标记清除
  75. LocationStatistics.objects.filter(is_latest=True).update(is_latest=False)
  76. statistics_objects = []
  77. group_statistics_objects = []
  78. for key, data in statistics_data.items():
  79. # 计算使用率
  80. utilization_rate = 0
  81. if data['total'] > 0:
  82. utilization_rate = round((data['used'] / data['total']) * 100, 2)
  83. # 创建主统计对象
  84. stat = LocationStatistics(
  85. warehouse_code=data['warehouse_code'],
  86. warehouse_name=data['warehouse_name'],
  87. layer=data['layer'],
  88. statistic_time=current_time,
  89. is_latest=True,
  90. total_locations=data['total'],
  91. total_used=data['used'],
  92. total_available=data['available'],
  93. utilization_rate=utilization_rate
  94. )
  95. # 设置各类型货位统计
  96. loc_types = data['location_types']
  97. for loc_type, counts in loc_types.items():
  98. if loc_type == 'T5':
  99. stat.t5_total = counts['total']
  100. stat.t5_used = counts['used']
  101. stat.t5_available = counts['available']
  102. elif loc_type == 'T4':
  103. stat.t4_total = counts['total']
  104. stat.t4_used = counts['used']
  105. stat.t4_available = counts['available']
  106. elif loc_type == 'S4':
  107. stat.s4_total = counts['total']
  108. stat.s4_used = counts['used']
  109. stat.s4_available = counts['available']
  110. elif loc_type == 'T2':
  111. stat.t2_total = counts['total']
  112. stat.t2_used = counts['used']
  113. stat.t2_available = counts['available']
  114. elif loc_type == 'T1':
  115. stat.t1_total = counts['total']
  116. stat.t1_used = counts['used']
  117. stat.t1_available = counts['available']
  118. statistics_objects.append(stat)
  119. # 创建货位组统计对象
  120. for group_name, group_data in data['groups'].items():
  121. group_utilization = 0
  122. if group_data['total'] > 0:
  123. group_utilization = round((group_data['used'] / group_data['total']) * 100, 2)
  124. group_stat = LocationGroupStatistics(
  125. warehouse_code=data['warehouse_code'],
  126. warehouse_name=data['warehouse_name'],
  127. layer=data['layer'],
  128. location_group=group_name,
  129. total_locations=group_data['total'],
  130. used_locations=group_data['used'],
  131. available_locations=group_data['available'],
  132. utilization_rate=group_utilization,
  133. location_type_breakdown={}, # 这里可以进一步细化
  134. statistic_time=current_time
  135. )
  136. group_statistics_objects.append(group_stat)
  137. # 批量保存
  138. LocationStatistics.objects.bulk_create(statistics_objects)
  139. LocationGroupStatistics.objects.bulk_create(group_statistics_objects)
  140. return len(statistics_objects)
  141. @staticmethod
  142. def get_latest_statistics(warehouse_code=None, layer=None):
  143. """获取最新统计信息"""
  144. filters = {'is_latest': True}
  145. if warehouse_code:
  146. filters['warehouse_code'] = warehouse_code
  147. if layer is not None:
  148. filters['layer'] = layer
  149. return LocationStatistics.objects.filter(**filters).order_by('warehouse_code', 'layer')
  150. @staticmethod
  151. def get_statistics_by_time_range(start_time, end_time, warehouse_code=None, layer=None):
  152. """按时间范围获取统计信息"""
  153. filters = {
  154. 'statistic_time__gte': start_time,
  155. 'statistic_time__lte': end_time
  156. }
  157. if warehouse_code:
  158. filters['warehouse_code'] = warehouse_code
  159. if layer is not None:
  160. filters['layer'] = layer
  161. return LocationStatistics.objects.filter(**filters).order_by('statistic_time', 'warehouse_code', 'layer')