from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from django.utils import timezone from datetime import timedelta from .services import LocationStatisticsService from .models import LocationGroupStatistics from django.db import transaction from django.db.models import Count, Q, Prefetch from decimal import Decimal, InvalidOperation import logging logger = logging.getLogger(__name__) from .serializers import LocationStatisticsSerializer, LocationGroupStatisticsSerializer class LocationStatisticsView(APIView): """货位统计API视图""" def get(self, request): """获取货位统计信息""" warehouse_code = request.GET.get('warehouse_code') layer = request.GET.get('layer') # 尝试从参数转换层数 if layer is not None: try: layer = int(layer) except ValueError: layer = None # 从数据库获取最新统计 statistics = LocationStatisticsService.get_latest_statistics(warehouse_code, layer) serializer = LocationStatisticsSerializer(statistics, many=True) response_data = { 'timestamp': timezone.now().isoformat(), 'data': serializer.data } return Response(response_data) def post(self, request): """手动触发统计计算""" warehouse_code = request.data.get('warehouse_code') layer = request.data.get('layer') # 计算统计信息 stats_data = LocationStatisticsService.calculate_location_statistics(warehouse_code, layer) # 保存到数据库 count = LocationStatisticsService.save_statistics_to_db(stats_data) return Response({ 'message': f'成功统计了{count}条记录', 'timestamp': timezone.now().isoformat() }, status=status.HTTP_201_CREATED) class LocationStatisticsHistoryView(APIView): """货位统计历史数据API视图""" def get(self, request): """获取历史统计信息""" warehouse_code = request.GET.get('warehouse_code') layer = request.GET.get('layer') hours = int(request.GET.get('hours', 24)) # 默认最近24小时 end_time = timezone.now() start_time = end_time - timedelta(hours=hours) statistics = LocationStatisticsService.get_statistics_by_time_range( start_time, end_time, warehouse_code, layer ) serializer = LocationStatisticsSerializer(statistics, many=True) return Response({ 'period': f'{start_time} - {end_time}', 'data': serializer.data }) class LocationGroupStatisticsView(APIView): """货位组统计API视图""" def get(self, request): """获取货位组统计信息""" warehouse_code = request.GET.get('warehouse_code') layer = request.GET.get('layer') # 新增过滤参数 min_utilization = float(request.GET.get('min_utilization', 0)) # 最小使用率,默认0 min_used_locations = int(request.GET.get('min_used_locations', 0)) # 最小已用货位数,默认0 filters = {} if warehouse_code: filters['warehouse_code'] = warehouse_code if layer: try: filters['layer'] = int(layer) except ValueError: pass # 获取最新的组统计 from django.db.models import Max latest_stats = LocationGroupStatistics.objects.filter( **filters ).values('warehouse_code', 'layer', 'location_group').annotate( latest_time=Max('statistic_time') ) group_stats = [] for stat in latest_stats: latest_record = LocationGroupStatistics.objects.filter( warehouse_code=stat['warehouse_code'], layer=stat['layer'], location_group=stat['location_group'], statistic_time=stat['latest_time'] ).first() # 根据参数过滤 if latest_record: # 检查使用率条件 utilization_ok = latest_record.utilization_rate >= min_utilization # 检查已用货位数条件 used_locations_ok = latest_record.used_locations >= min_used_locations if utilization_ok and used_locations_ok: group_stats.append(latest_record) serializer = LocationGroupStatisticsSerializer(group_stats, many=True) return Response({ 'timestamp': timezone.now().isoformat(), 'filters': { 'min_utilization': min_utilization, 'min_used_locations': min_used_locations, 'total_records': len(group_stats) }, 'data': serializer.data }) class LocationConsistencyCheckView(APIView): """库位一致性检查API""" def post(self, request): warehouse_code = request.data.get('warehouse_code') layer_param = request.GET.get('layer') layer = None if layer_param is not None: try: layer_value = int(layer_param) if layer_value > 0: layer = layer_value except (TypeError, ValueError): layer = None auto_fix = self._to_bool(request.data.get('auto_fix', False)) fix_scope = request.data.get('fix_scope') checker = LocationConsistencyChecker( warehouse_code, layer, auto_fix, fix_scope=fix_scope ) checker.check_all() return Response({ 'success': True, 'data': checker.generate_report(), 'auto_fix': auto_fix, 'fix_scope': checker.fix_scope }) @staticmethod def _to_bool(value): if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() in {'1', 'true', 'yes', 'y', 'on'} if isinstance(value, int): return value != 0 return False class LocationConsistencyChecker: TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1'] """ 库位一致性检测器 用于检测库位状态与Link记录的一致性以及库位组的状态一致性 """ def __init__(self, warehouse_code=None, layer=None, auto_fix=False, fix_scope=None): """ 初始化检测器 Args: warehouse_code: 指定仓库代码,如果为None则检测所有仓库 layer: 指定楼层,如果为None则检测所有楼层 auto_fix: 是否自动修复检测到的问题 fix_scope: 修复范围(locations/details/groups),None表示全部 """ self.warehouse_code = warehouse_code self.layer = layer if self.layer is not None: try: layer_int = int(self.layer) self.layer = layer_int if layer_int > 0 else None except (TypeError, ValueError): self.layer = None self.auto_fix = auto_fix if isinstance(fix_scope, (list, tuple, set)): fix_scope = [scope for scope in fix_scope if scope in {'locations', 'groups', 'details'}] elif isinstance(fix_scope, str): if fix_scope == 'all': fix_scope = None elif fix_scope in {'locations', 'groups', 'details'}: fix_scope = [fix_scope] else: fix_scope = None else: fix_scope = None self.fix_scope = fix_scope or ['locations', 'groups', 'details'] self.results = { 'check_time': timezone.now(), 'location_errors': [], 'group_errors': [], 'detail_errors': [], 'fixed_locations': [], 'fixed_groups': [], 'fixed_details': [], 'repair_errors': [], 'summary': { 'total_locations': 0, 'checked_locations': 0, 'error_locations': 0, 'total_groups': 0, 'checked_groups': 0, 'error_groups': 0, 'total_details': 0, 'checked_details': 0, 'error_details': 0, 'fixed_location_count': 0, 'fixed_group_count': 0, 'fixed_detail_count': 0 } } def check_all(self): """执行所有检测""" logger.info(f"开始库位一致性检测,仓库: {self.warehouse_code}, 楼层: {self.layer}") # 检测库位状态与Link记录的一致性 self.check_location_link_consistency() # 检测托盘明细状态一致性 self.check_container_detail_status() # 检测库位组状态一致性 self.check_group_consistency() # 如果启用自动修复,执行修复 if self.auto_fix: self.fix_detected_issues() logger.info(f"库位一致性检测完成,发现{self.results['summary']['error_locations']}个库位问题," f"{self.results['summary']['error_groups']}个库位组问题," f"{self.results['summary']['error_details']}条托盘明细问题") return self.results def check_location_link_consistency(self): """检测库位状态与Link记录的一致性""" from bin.models import LocationModel, LocationContainerLink # 构建查询条件 filters = {'is_active': True, 'location_type__in': self.TARGET_LOCATION_TYPES} if self.warehouse_code: filters['warehouse_code'] = self.warehouse_code if self.layer is not None: filters['layer'] = self.layer # 获取库位并预取Link记录 locations = LocationModel.objects.filter(**filters).prefetch_related('container_links') self.results['summary']['total_locations'] = locations.count() for location in locations: self.results['summary']['checked_locations'] += 1 # 获取该库位的活跃Link记录数量 active_links_count = location.container_links.filter(is_active=True).count() # 根据状态判断是否一致 is_consistent, expected_status, error_type = self._check_location_consistency( location.status, active_links_count ) if not is_consistent: self.results['summary']['error_locations'] += 1 self.results['location_errors'].append({ 'location_id': location.id, 'location_code': location.location_code, 'warehouse_code': location.warehouse_code, 'current_status': location.status, 'expected_status': expected_status, 'active_links_count': active_links_count, 'layer': location.layer, 'row': location.row, 'col': location.col, 'location_group': location.location_group, 'error_type': error_type, 'detected_at': timezone.now() }) def _check_location_consistency(self, current_status, active_links_count): """ 检查单个库位的状态一致性 Returns: tuple: (是否一致, 预期状态, 错误类型) """ if current_status == 'available': # available状态应该没有活跃的Link记录 if active_links_count > 0: return False, 'occupied', 'available_but_has_links' elif current_status in ['occupied','reserved']: # occupied状态应该至少有一个活跃的Link记录 if active_links_count != 1: return False, 'available', 'occupied_but_no_links' elif current_status in ['disabled', 'maintenance']: # 这些特殊状态可以有Link记录,但通常不应该有 if active_links_count > 0: return False, current_status, 'special_status_has_links' return True, current_status, None def check_container_detail_status(self): """检测托盘明细状态的一致性""" from container.models import ContainerDetailModel from bin.models import LocationContainerLink details_qs = ( ContainerDetailModel.objects.filter(is_delete=False) .select_related('container', 'batch') .prefetch_related( Prefetch( 'container__location_links', queryset=LocationContainerLink.objects.filter(is_active=True).select_related('location'), to_attr='active_links' ) ) ) total_checked = 0 for detail in details_qs: if not detail.container: continue if not self._detail_in_scope(detail): continue goods_qty = self._to_decimal(detail.goods_qty) goods_out_qty = self._to_decimal(detail.goods_out_qty) remaining_qty = goods_qty - goods_out_qty expected_status, error_type = self._determine_detail_status(remaining_qty) total_checked += 1 self.results['summary']['checked_details'] += 1 if expected_status is None: continue if detail.status != expected_status: self.results['summary']['error_details'] += 1 self.results['detail_errors'].append({ 'detail_id': detail.id, 'container_id': detail.container_id, 'container_code': getattr(detail.container, 'container_code', None), 'batch_id': detail.batch_id, 'batch_number': getattr(detail.batch, 'bound_number', None) if detail.batch else None, 'goods_qty': str(goods_qty), 'goods_out_qty': str(goods_out_qty), 'remaining_qty': str(remaining_qty), 'current_status': detail.status, 'current_status_display': self._status_display(detail.status), 'expected_status': expected_status, 'expected_status_display': self._status_display(expected_status), 'error_type': error_type, 'detected_at': timezone.now() }) self.results['summary']['total_details'] = total_checked def _detail_in_scope(self, detail): """判断托盘明细是否在当前检测范围内""" if not (self.warehouse_code or self.layer): return True active_links = getattr(detail.container, 'active_links', None) if not active_links: return False warehouse_ok = True layer_ok = True if self.warehouse_code: warehouse_ok = any(link.location.warehouse_code == self.warehouse_code for link in active_links) if self.layer is not None: layer_ok = any(link.location.layer == self.layer for link in active_links) return warehouse_ok and layer_ok def _to_decimal(self, value): if isinstance(value, Decimal): return value if value is None: return Decimal('0') try: return Decimal(str(value)) except (InvalidOperation, TypeError, ValueError): return Decimal('0') def _determine_detail_status(self, remaining_qty): if remaining_qty > 0: return 2, 'detail_should_be_in_stock' return 3, 'detail_should_be_outbound' def _status_display(self, status): status_map = { 0: '空盘', 2: '在盘', 3: '离库' } return status_map.get(status, str(status) if status is not None else '未知') def check_group_consistency(self): """检测库位组状态的一致性""" from bin.models import LocationGroupModel # 构建查询条件 filters = {'is_active': True} if self.warehouse_code: filters['warehouse_code'] = self.warehouse_code if self.layer is not None: filters['layer'] = self.layer # 获取库位组并预取关联的库位 groups = LocationGroupModel.objects.filter(**filters).prefetch_related('location_items') self.results['summary']['total_groups'] = groups.count() for group in groups: self.results['summary']['checked_groups'] += 1 # 获取组内所有库位 group_locations = group.location_items.filter(is_active=True) # 统计组内库位的状态分布 status_counts = self._get_group_status_distribution(group_locations) total_locations = group_locations.count() # 根据组内库位状态推断组应该的状态 is_consistent, expected_status, error_type = self._check_group_consistency( group.status, status_counts, total_locations ) if not is_consistent: self.results['summary']['error_groups'] += 1 self.results['group_errors'].append({ 'group_id': group.id, 'group_code': group.group_code, 'group_name': group.group_name, 'warehouse_code': group.warehouse_code, 'current_status': group.status, 'expected_status': expected_status, 'total_locations': total_locations, 'status_distribution': status_counts, 'layer': group.layer, 'error_type': error_type, 'detected_at': timezone.now() }) def _get_group_status_distribution(self, group_locations): """获取组内库位状态分布""" return group_locations.aggregate( available_count=Count('id', filter=Q(status='available')), occupied_count=Count('id', filter=Q(status='occupied')), disabled_count=Count('id', filter=Q(status='disabled')), reserved_count=Count('id', filter=Q(status='reserved')), maintenance_count=Count('id', filter=Q(status='maintenance')) ) def _check_group_consistency(self, current_status, status_counts, total_locations): """ 检查库位组状态一致性 Returns: tuple: (是否一致, 预期状态, 错误类型) """ if total_locations == 0: # 空组应该是available或disabled if current_status not in ['available', 'disabled']: return False, 'available', 'empty_group_wrong_status' else: # 根据库位状态分布推断组状态 if status_counts['occupied_count'] == total_locations: # 所有库位都被占用,组状态应该是full if current_status != 'full': return False, 'full', 'all_occupied_but_not_full' elif status_counts['occupied_count'] > 0: # 有库位被占用,组状态应该是occupied if current_status != 'occupied': return False, 'occupied', 'has_occupied_but_wrong_status' elif status_counts['available_count'] == total_locations: # 所有库位都可用,组状态应该是available if current_status != 'available': return False, 'available', 'all_available_but_wrong_status' # 检查特殊状态 elif status_counts['disabled_count'] > 0 and current_status != 'disabled': return False, 'disabled', 'has_disabled_but_wrong_status' elif status_counts['maintenance_count'] > 0 and current_status != 'maintenance': return False, 'maintenance', 'has_maintenance_but_wrong_status' return True, current_status, None def fix_detected_issues(self): """修复检测到的不一致问题""" logger.info("开始修复检测到的不一致问题") # 修复库位状态不一致 if 'locations' in self.fix_scope and self.results['location_errors']: self._fix_location_issues() # 修复托盘明细状态不一致 if 'details' in self.fix_scope and self.results['detail_errors']: self._fix_detail_issues() # 修复库位组状态不一致 if 'groups' in self.fix_scope and self.results['group_errors']: self._fix_group_issues() logger.info(f"修复完成: {self.results['summary']['fixed_location_count']}个库位, " f"{self.results['summary']['fixed_group_count']}个库位组, " f"{self.results['summary']['fixed_detail_count']}条托盘明细") def _fix_location_issues(self): """修复库位状态不一致问题""" from bin.models import LocationModel for error in self.results['location_errors']: try: with transaction.atomic(): location = LocationModel.objects.select_for_update().get(id=error['location_id']) # 只有明确有预期状态时才修复 if error['expected_status'] and error['expected_status'] != 'need_check': old_status = location.status location.status = error['expected_status'] location.save() self.results['summary']['fixed_location_count'] += 1 self.results['fixed_locations'].append({ 'location_id': location.id, 'location_code': location.location_code, 'old_status': old_status, 'new_status': location.status, 'fixed_at': timezone.now(), 'error_type': error['error_type'] }) else: # 标记为需要手动检查 self.results['fixed_locations'].append({ 'location_id': location.id, 'location_code': location.location_code, 'status': '需要手动检查', 'reason': '无法自动确定正确状态', 'error_type': error['error_type'] }) except Exception as e: logger.error(f"修复库位{error.get('location_id')}时出错: {str(e)}") self.results['repair_errors'].append({ 'type': 'location_fix_error', 'location_id': error.get('location_id'), 'error_message': str(e), 'error_type': error.get('error_type') }) def _fix_group_issues(self): """修复库位组状态不一致问题""" from bin.models import LocationGroupModel for error in self.results['group_errors']: try: with transaction.atomic(): group = LocationGroupModel.objects.select_for_update().get(id=error['group_id']) # 只有明确有预期状态时才修复 if error['expected_status'] and error['expected_status'] != 'need_check': old_status = group.status group.status = error['expected_status'] group.save() self.results['summary']['fixed_group_count'] += 1 self.results['fixed_groups'].append({ 'group_id': group.id, 'group_code': group.group_code, 'old_status': old_status, 'new_status': group.status, 'fixed_at': timezone.now(), 'error_type': error['error_type'] }) else: # 标记为需要手动检查 self.results['fixed_groups'].append({ 'group_id': group.id, 'group_code': group.group_code, 'status': '需要手动检查', 'reason': '无法自动确定正确状态', 'error_type': error.get('error_type') }) except Exception as e: logger.error(f"修复库位组{error.get('group_id')}时出错: {str(e)}") self.results['repair_errors'].append({ 'type': 'group_fix_error', 'group_id': error.get('group_id'), 'error_message': str(e), 'error_type': error.get('error_type') }) def _fix_detail_issues(self): """修复托盘明细状态不一致问题""" from container.models import ContainerDetailModel for error in self.results['detail_errors']: expected_status = error.get('expected_status') if expected_status not in [2, 3]: continue try: with transaction.atomic(): detail = ContainerDetailModel.objects.select_for_update().select_related('container').get(id=error['detail_id']) if detail.status == expected_status: continue old_status = detail.status detail.status = expected_status detail.save(update_fields=['status']) self.results['summary']['fixed_detail_count'] += 1 error['fixed'] = True error['fixed_at'] = timezone.now() error['new_status'] = expected_status error['new_status_display'] = self._status_display(expected_status) self.results['fixed_details'].append({ 'detail_id': detail.id, 'container_code': getattr(detail.container, 'container_code', None), 'old_status': old_status, 'new_status': expected_status, 'fixed_at': timezone.now(), 'error_type': error.get('error_type') }) except ContainerDetailModel.DoesNotExist: logger.warning(f"托盘明细 {error.get('detail_id')} 不存在,跳过修复") except Exception as e: logger.error(f"修复托盘明细{error.get('detail_id')}失败: {str(e)}") self.results['repair_errors'].append({ 'type': 'detail_fix_error', 'detail_id': error.get('detail_id'), 'error_message': str(e), 'error_type': error.get('error_type') }) def get_summary(self): """获取检测摘要""" return { 'check_time': self.results['check_time'], 'total_checked': { 'locations': self.results['summary']['checked_locations'], 'groups': self.results['summary']['checked_groups'], 'details': self.results['summary']['checked_details'] }, 'errors_found': { 'locations': self.results['summary']['error_locations'], 'groups': self.results['summary']['error_groups'], 'details': self.results['summary']['error_details'] }, 'fixed': { 'locations': self.results['summary']['fixed_location_count'], 'groups': self.results['summary']['fixed_group_count'], 'details': self.results['summary']['fixed_detail_count'] }, 'has_errors': len(self.results['repair_errors']) > 0 } def generate_report(self): """生成检测报告""" summary = self.get_summary() report = { 'summary': summary, 'details': { 'location_errors': self.results['location_errors'], 'group_errors': self.results['group_errors'], 'detail_errors': self.results['detail_errors'], 'fixed_locations': self.results['fixed_locations'], 'fixed_groups': self.results['fixed_groups'], 'fixed_details': self.results['fixed_details'], 'repair_errors': self.results['repair_errors'] } } return report # 便捷函数 def check_location_consistency(warehouse_code=None, layer=None, auto_fix=False): """ 便捷函数:检查库位一致性 Args: warehouse_code: 仓库代码 layer: 楼层 auto_fix: 是否自动修复 Returns: dict: 检测结果 """ checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix) return checker.check_all() def get_consistency_report(warehouse_code=None, layer=None, auto_fix=False): """ 获取详细的检测报告 Args: warehouse_code: 仓库代码 layer: 楼层 auto_fix: 是否自动修复 Returns: dict: 检测报告 """ checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix) checker.check_all() return checker.generate_report()