from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from django.utils import timezone from datetime import timedelta from .services import LocationStatisticsService from .models import LocationGroupStatistics from django.db import transaction from django.db.models import Count, Q import logging logger = logging.getLogger(__name__) from .serializers import LocationStatisticsSerializer, LocationGroupStatisticsSerializer class LocationStatisticsView(APIView): """货位统计API视图""" def get(self, request): """获取货位统计信息""" warehouse_code = request.GET.get('warehouse_code') layer = request.GET.get('layer') # 尝试从参数转换层数 if layer is not None: try: layer = int(layer) except ValueError: layer = None # 从数据库获取最新统计 statistics = LocationStatisticsService.get_latest_statistics(warehouse_code, layer) serializer = LocationStatisticsSerializer(statistics, many=True) response_data = { 'timestamp': timezone.now().isoformat(), 'data': serializer.data } return Response(response_data) def post(self, request): """手动触发统计计算""" warehouse_code = request.data.get('warehouse_code') layer = request.data.get('layer') # 计算统计信息 stats_data = LocationStatisticsService.calculate_location_statistics(warehouse_code, layer) # 保存到数据库 count = LocationStatisticsService.save_statistics_to_db(stats_data) return Response({ 'message': f'成功统计了{count}条记录', 'timestamp': timezone.now().isoformat() }, status=status.HTTP_201_CREATED) class LocationStatisticsHistoryView(APIView): """货位统计历史数据API视图""" def get(self, request): """获取历史统计信息""" warehouse_code = request.GET.get('warehouse_code') layer = request.GET.get('layer') hours = int(request.GET.get('hours', 24)) # 默认最近24小时 end_time = timezone.now() start_time = end_time - timedelta(hours=hours) statistics = LocationStatisticsService.get_statistics_by_time_range( start_time, end_time, warehouse_code, layer ) serializer = LocationStatisticsSerializer(statistics, many=True) return Response({ 'period': f'{start_time} - {end_time}', 'data': serializer.data }) class LocationGroupStatisticsView(APIView): """货位组统计API视图""" def get(self, request): """获取货位组统计信息""" warehouse_code = request.GET.get('warehouse_code') layer = request.GET.get('layer') # 新增过滤参数 min_utilization = float(request.GET.get('min_utilization', 0)) # 最小使用率,默认0 min_used_locations = int(request.GET.get('min_used_locations', 0)) # 最小已用货位数,默认0 filters = {} if warehouse_code: filters['warehouse_code'] = warehouse_code if layer: try: filters['layer'] = int(layer) except ValueError: pass # 获取最新的组统计 from django.db.models import Max latest_stats = LocationGroupStatistics.objects.filter( **filters ).values('warehouse_code', 'layer', 'location_group').annotate( latest_time=Max('statistic_time') ) group_stats = [] for stat in latest_stats: latest_record = LocationGroupStatistics.objects.filter( warehouse_code=stat['warehouse_code'], layer=stat['layer'], location_group=stat['location_group'], statistic_time=stat['latest_time'] ).first() # 根据参数过滤 if latest_record: # 检查使用率条件 utilization_ok = latest_record.utilization_rate >= min_utilization # 检查已用货位数条件 used_locations_ok = latest_record.used_locations >= min_used_locations if utilization_ok and used_locations_ok: group_stats.append(latest_record) serializer = LocationGroupStatisticsSerializer(group_stats, many=True) return Response({ 'timestamp': timezone.now().isoformat(), 'filters': { 'min_utilization': min_utilization, 'min_used_locations': min_used_locations, 'total_records': len(group_stats) }, 'data': serializer.data }) class LocationConsistencyCheckView(APIView): """库位一致性检查API""" def post(self, request): warehouse_code = request.data.get('warehouse_code') layer = request.GET.get('layer') if int(layer) < 1 : layer = None auto_fix = request.data.get('auto_fix', False) checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix) result = checker.check_all() return Response({ 'success': True, 'data': checker.generate_report() }) class LocationConsistencyChecker: TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1'] """ 库位一致性检测器 用于检测库位状态与Link记录的一致性以及库位组的状态一致性 """ def __init__(self, warehouse_code=None, layer=None, auto_fix=False): """ 初始化检测器 Args: warehouse_code: 指定仓库代码,如果为None则检测所有仓库 layer: 指定楼层,如果为None则检测所有楼层 auto_fix: 是否自动修复检测到的问题 """ self.warehouse_code = warehouse_code self.layer = layer self.auto_fix = auto_fix self.results = { 'check_time': timezone.now(), 'location_errors': [], 'group_errors': [], 'fixed_locations': [], 'fixed_groups': [], 'repair_errors': [], 'summary': { 'total_locations': 0, 'checked_locations': 0, 'error_locations': 0, 'total_groups': 0, 'checked_groups': 0, 'error_groups': 0, 'fixed_location_count': 0, 'fixed_group_count': 0 } } def check_all(self): """执行所有检测""" logger.info(f"开始库位一致性检测,仓库: {self.warehouse_code}, 楼层: {self.layer}") # 检测库位状态与Link记录的一致性 self.check_location_link_consistency() # 检测库位组状态一致性 self.check_group_consistency() # 如果启用自动修复,执行修复 if self.auto_fix: self.fix_detected_issues() logger.info(f"库位一致性检测完成,发现{self.results['summary']['error_locations']}个库位问题," f"{self.results['summary']['error_groups']}个库位组问题") return self.results def check_location_link_consistency(self): """检测库位状态与Link记录的一致性""" from bin.models import LocationModel, LocationContainerLink # 构建查询条件 filters = {'is_active': True, 'location_type__in': self.TARGET_LOCATION_TYPES} if self.warehouse_code: filters['warehouse_code'] = self.warehouse_code if self.layer is not None: filters['layer'] = self.layer # 获取库位并预取Link记录 locations = LocationModel.objects.filter(**filters).prefetch_related('container_links') self.results['summary']['total_locations'] = locations.count() for location in locations: self.results['summary']['checked_locations'] += 1 # 获取该库位的活跃Link记录数量 active_links_count = location.container_links.filter(is_active=True).count() # 根据状态判断是否一致 is_consistent, expected_status, error_type = self._check_location_consistency( location.status, active_links_count ) if not is_consistent: self.results['summary']['error_locations'] += 1 self.results['location_errors'].append({ 'location_id': location.id, 'location_code': location.location_code, 'warehouse_code': location.warehouse_code, 'current_status': location.status, 'expected_status': expected_status, 'active_links_count': active_links_count, 'layer': location.layer, 'row': location.row, 'col': location.col, 'location_group': location.location_group, 'error_type': error_type, 'detected_at': timezone.now() }) def _check_location_consistency(self, current_status, active_links_count): """ 检查单个库位的状态一致性 Returns: tuple: (是否一致, 预期状态, 错误类型) """ if current_status == 'available': # available状态应该没有活跃的Link记录 if active_links_count > 0: return False, 'occupied', 'available_but_has_links' elif current_status in ['occupied','reserved']: # occupied状态应该至少有一个活跃的Link记录 if active_links_count != 1: return False, 'available', 'occupied_but_no_links' elif current_status in ['disabled', 'maintenance']: # 这些特殊状态可以有Link记录,但通常不应该有 if active_links_count > 0: return False, current_status, 'special_status_has_links' return True, current_status, None def check_group_consistency(self): """检测库位组状态的一致性""" from bin.models import LocationGroupModel # 构建查询条件 filters = {'is_active': True} if self.warehouse_code: filters['warehouse_code'] = self.warehouse_code if self.layer is not None: filters['layer'] = self.layer # 获取库位组并预取关联的库位 groups = LocationGroupModel.objects.filter(**filters).prefetch_related('location_items') self.results['summary']['total_groups'] = groups.count() for group in groups: self.results['summary']['checked_groups'] += 1 # 获取组内所有库位 group_locations = group.location_items.filter(is_active=True) # 统计组内库位的状态分布 status_counts = self._get_group_status_distribution(group_locations) total_locations = group_locations.count() # 根据组内库位状态推断组应该的状态 is_consistent, expected_status, error_type = self._check_group_consistency( group.status, status_counts, total_locations ) if not is_consistent: self.results['summary']['error_groups'] += 1 self.results['group_errors'].append({ 'group_id': group.id, 'group_code': group.group_code, 'group_name': group.group_name, 'warehouse_code': group.warehouse_code, 'current_status': group.status, 'expected_status': expected_status, 'total_locations': total_locations, 'status_distribution': status_counts, 'layer': group.layer, 'error_type': error_type, 'detected_at': timezone.now() }) def _get_group_status_distribution(self, group_locations): """获取组内库位状态分布""" return group_locations.aggregate( available_count=Count('id', filter=Q(status='available')), occupied_count=Count('id', filter=Q(status='occupied')), disabled_count=Count('id', filter=Q(status='disabled')), reserved_count=Count('id', filter=Q(status='reserved')), maintenance_count=Count('id', filter=Q(status='maintenance')) ) def _check_group_consistency(self, current_status, status_counts, total_locations): """ 检查库位组状态一致性 Returns: tuple: (是否一致, 预期状态, 错误类型) """ if total_locations == 0: # 空组应该是available或disabled if current_status not in ['available', 'disabled']: return False, 'available', 'empty_group_wrong_status' else: # 根据库位状态分布推断组状态 if status_counts['occupied_count'] == total_locations: # 所有库位都被占用,组状态应该是full if current_status != 'full': return False, 'full', 'all_occupied_but_not_full' elif status_counts['occupied_count'] > 0: # 有库位被占用,组状态应该是occupied if current_status != 'occupied': return False, 'occupied', 'has_occupied_but_wrong_status' elif status_counts['available_count'] == total_locations: # 所有库位都可用,组状态应该是available if current_status != 'available': return False, 'available', 'all_available_but_wrong_status' # 检查特殊状态 elif status_counts['disabled_count'] > 0 and current_status != 'disabled': return False, 'disabled', 'has_disabled_but_wrong_status' elif status_counts['maintenance_count'] > 0 and current_status != 'maintenance': return False, 'maintenance', 'has_maintenance_but_wrong_status' return True, current_status, None def fix_detected_issues(self): """修复检测到的不一致问题""" logger.info("开始修复检测到的不一致问题") # 修复库位状态不一致 if self.results['location_errors']: self._fix_location_issues() # 修复库位组状态不一致 if self.results['group_errors']: self._fix_group_issues() logger.info(f"修复完成: {self.results['summary']['fixed_location_count']}个库位, " f"{self.results['summary']['fixed_group_count']}个库位组") def _fix_location_issues(self): """修复库位状态不一致问题""" from bin.models import LocationModel for error in self.results['location_errors']: try: with transaction.atomic(): location = LocationModel.objects.select_for_update().get(id=error['location_id']) # 只有明确有预期状态时才修复 if error['expected_status'] and error['expected_status'] != 'need_check': old_status = location.status location.status = error['expected_status'] location.save() self.results['summary']['fixed_location_count'] += 1 self.results['fixed_locations'].append({ 'location_id': location.id, 'location_code': location.location_code, 'old_status': old_status, 'new_status': location.status, 'fixed_at': timezone.now(), 'error_type': error['error_type'] }) else: # 标记为需要手动检查 self.results['fixed_locations'].append({ 'location_id': location.id, 'location_code': location.location_code, 'status': '需要手动检查', 'reason': '无法自动确定正确状态', 'error_type': error['error_type'] }) except Exception as e: logger.error(f"修复库位{error.get('location_id')}时出错: {str(e)}") self.results['repair_errors'].append({ 'type': 'location_fix_error', 'location_id': error.get('location_id'), 'error_message': str(e), 'error_type': error.get('error_type') }) def _fix_group_issues(self): """修复库位组状态不一致问题""" from bin.models import LocationGroupModel for error in self.results['group_errors']: try: with transaction.atomic(): group = LocationGroupModel.objects.select_for_update().get(id=error['group_id']) # 只有明确有预期状态时才修复 if error['expected_status'] and error['expected_status'] != 'need_check': old_status = group.status group.status = error['expected_status'] group.save() self.results['summary']['fixed_group_count'] += 1 self.results['fixed_groups'].append({ 'group_id': group.id, 'group_code': group.group_code, 'old_status': old_status, 'new_status': group.status, 'fixed_at': timezone.now(), 'error_type': error['error_type'] }) else: # 标记为需要手动检查 self.results['fixed_groups'].append({ 'group_id': group.id, 'group_code': group.group_code, 'status': '需要手动检查', 'reason': '无法自动确定正确状态', 'error_type': error.get('error_type') }) except Exception as e: logger.error(f"修复库位组{error.get('group_id')}时出错: {str(e)}") self.results['repair_errors'].append({ 'type': 'group_fix_error', 'group_id': error.get('group_id'), 'error_message': str(e), 'error_type': error.get('error_type') }) def get_summary(self): """获取检测摘要""" return { 'check_time': self.results['check_time'], 'total_checked': { 'locations': self.results['summary']['checked_locations'], 'groups': self.results['summary']['checked_groups'] }, 'errors_found': { 'locations': self.results['summary']['error_locations'], 'groups': self.results['summary']['error_groups'] }, 'fixed': { 'locations': self.results['summary']['fixed_location_count'], 'groups': self.results['summary']['fixed_group_count'] }, 'has_errors': len(self.results['repair_errors']) > 0 } def generate_report(self): """生成检测报告""" summary = self.get_summary() report = { 'summary': summary, 'details': { 'location_errors': self.results['location_errors'], 'group_errors': self.results['group_errors'], 'fixed_locations': self.results['fixed_locations'], 'fixed_groups': self.results['fixed_groups'], 'repair_errors': self.results['repair_errors'] } } return report # 便捷函数 def check_location_consistency(warehouse_code=None, layer=None, auto_fix=False): """ 便捷函数:检查库位一致性 Args: warehouse_code: 仓库代码 layer: 楼层 auto_fix: 是否自动修复 Returns: dict: 检测结果 """ checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix) return checker.check_all() def get_consistency_report(warehouse_code=None, layer=None, auto_fix=False): """ 获取详细的检测报告 Args: warehouse_code: 仓库代码 layer: 楼层 auto_fix: 是否自动修复 Returns: dict: 检测报告 """ checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix) checker.check_all() return checker.generate_report()