| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758 |
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from django.utils import timezone
- from datetime import timedelta
- from .services import LocationStatisticsService
- from .models import LocationGroupStatistics
- from django.db import transaction
- from django.db.models import Count, Q, Prefetch
- from decimal import Decimal, InvalidOperation
- import logging
- logger = logging.getLogger(__name__)
- from .serializers import LocationStatisticsSerializer, LocationGroupStatisticsSerializer
- class LocationStatisticsView(APIView):
- """货位统计API视图"""
-
- def get(self, request):
- """获取货位统计信息"""
- warehouse_code = request.GET.get('warehouse_code')
- layer = request.GET.get('layer')
-
- # 尝试从参数转换层数
- if layer is not None:
- try:
- layer = int(layer)
- except ValueError:
- layer = None
-
- # 从数据库获取最新统计
- statistics = LocationStatisticsService.get_latest_statistics(warehouse_code, layer)
- serializer = LocationStatisticsSerializer(statistics, many=True)
-
- response_data = {
- 'timestamp': timezone.now().isoformat(),
- 'data': serializer.data
- }
-
- return Response(response_data)
-
- def post(self, request):
- """手动触发统计计算"""
- warehouse_code = request.data.get('warehouse_code')
- layer = request.data.get('layer')
-
- # 计算统计信息
- stats_data = LocationStatisticsService.calculate_location_statistics(warehouse_code, layer)
-
- # 保存到数据库
- count = LocationStatisticsService.save_statistics_to_db(stats_data)
-
-
- return Response({
- 'message': f'成功统计了{count}条记录',
- 'timestamp': timezone.now().isoformat()
- }, status=status.HTTP_201_CREATED)
- class LocationStatisticsHistoryView(APIView):
- """货位统计历史数据API视图"""
-
- def get(self, request):
- """获取历史统计信息"""
- warehouse_code = request.GET.get('warehouse_code')
- layer = request.GET.get('layer')
- hours = int(request.GET.get('hours', 24)) # 默认最近24小时
-
- end_time = timezone.now()
- start_time = end_time - timedelta(hours=hours)
-
- statistics = LocationStatisticsService.get_statistics_by_time_range(
- start_time, end_time, warehouse_code, layer
- )
-
- serializer = LocationStatisticsSerializer(statistics, many=True)
-
- return Response({
- 'period': f'{start_time} - {end_time}',
- 'data': serializer.data
- })
- class LocationGroupStatisticsView(APIView):
- """货位组统计API视图"""
-
- def get(self, request):
- """获取货位组统计信息"""
- warehouse_code = request.GET.get('warehouse_code')
- layer = request.GET.get('layer')
-
- # 新增过滤参数
- min_utilization = float(request.GET.get('min_utilization', 0)) # 最小使用率,默认0
- min_used_locations = int(request.GET.get('min_used_locations', 0)) # 最小已用货位数,默认0
-
- filters = {}
- if warehouse_code:
- filters['warehouse_code'] = warehouse_code
- if layer:
- try:
- filters['layer'] = int(layer)
- except ValueError:
- pass
-
- # 获取最新的组统计
- from django.db.models import Max
- latest_stats = LocationGroupStatistics.objects.filter(
- **filters
- ).values('warehouse_code', 'layer', 'location_group').annotate(
- latest_time=Max('statistic_time')
- )
-
- group_stats = []
- for stat in latest_stats:
- latest_record = LocationGroupStatistics.objects.filter(
- warehouse_code=stat['warehouse_code'],
- layer=stat['layer'],
- location_group=stat['location_group'],
- statistic_time=stat['latest_time']
- ).first()
-
- # 根据参数过滤
- if latest_record:
- # 检查使用率条件
- utilization_ok = latest_record.utilization_rate >= min_utilization
- # 检查已用货位数条件
- used_locations_ok = latest_record.used_locations >= min_used_locations
-
- if utilization_ok and used_locations_ok:
- group_stats.append(latest_record)
-
- serializer = LocationGroupStatisticsSerializer(group_stats, many=True)
-
- return Response({
- 'timestamp': timezone.now().isoformat(),
- 'filters': {
- 'min_utilization': min_utilization,
- 'min_used_locations': min_used_locations,
- 'total_records': len(group_stats)
-
- },
- 'data': serializer.data
- })
-
- class LocationConsistencyCheckView(APIView):
- """库位一致性检查API"""
-
- def post(self, request):
- warehouse_code = request.data.get('warehouse_code')
- layer_param = request.GET.get('layer')
- layer = None
- if layer_param is not None:
- try:
- layer_value = int(layer_param)
- if layer_value > 0:
- layer = layer_value
- except (TypeError, ValueError):
- layer = None
- auto_fix = self._to_bool(request.data.get('auto_fix', False))
- fix_scope = request.data.get('fix_scope')
-
- checker = LocationConsistencyChecker(
- warehouse_code,
- layer,
- auto_fix,
- fix_scope=fix_scope
- )
- checker.check_all()
-
- return Response({
- 'success': True,
- 'data': checker.generate_report(),
- 'auto_fix': auto_fix,
- 'fix_scope': checker.fix_scope
- })
- @staticmethod
- def _to_bool(value):
- if isinstance(value, bool):
- return value
- if isinstance(value, str):
- return value.strip().lower() in {'1', 'true', 'yes', 'y', 'on'}
- if isinstance(value, int):
- return value != 0
- return False
- class LocationConsistencyChecker:
- TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1']
- """
- 库位一致性检测器
- 用于检测库位状态与Link记录的一致性以及库位组的状态一致性
- """
-
- def __init__(self, warehouse_code=None, layer=None, auto_fix=False, fix_scope=None):
- """
- 初始化检测器
-
- Args:
- warehouse_code: 指定仓库代码,如果为None则检测所有仓库
- layer: 指定楼层,如果为None则检测所有楼层
- auto_fix: 是否自动修复检测到的问题
- fix_scope: 修复范围(locations/details/groups),None表示全部
- """
- self.warehouse_code = warehouse_code
- self.layer = layer
- if self.layer is not None:
- try:
- layer_int = int(self.layer)
- self.layer = layer_int if layer_int > 0 else None
- except (TypeError, ValueError):
- self.layer = None
- self.auto_fix = auto_fix
- if isinstance(fix_scope, (list, tuple, set)):
- fix_scope = [scope for scope in fix_scope if scope in {'locations', 'groups', 'details'}]
- elif isinstance(fix_scope, str):
- if fix_scope == 'all':
- fix_scope = None
- elif fix_scope in {'locations', 'groups', 'details'}:
- fix_scope = [fix_scope]
- else:
- fix_scope = None
- else:
- fix_scope = None
- self.fix_scope = fix_scope or ['locations', 'groups', 'details']
- self.results = {
- 'check_time': timezone.now(),
- 'location_errors': [],
- 'group_errors': [],
- 'detail_errors': [],
- 'fixed_locations': [],
- 'fixed_groups': [],
- 'fixed_details': [],
- 'repair_errors': [],
- 'summary': {
- 'total_locations': 0,
- 'checked_locations': 0,
- 'error_locations': 0,
- 'total_groups': 0,
- 'checked_groups': 0,
- 'error_groups': 0,
- 'total_details': 0,
- 'checked_details': 0,
- 'error_details': 0,
- 'fixed_location_count': 0,
- 'fixed_group_count': 0,
- 'fixed_detail_count': 0
- }
- }
-
- def check_all(self):
- """执行所有检测"""
- logger.info(f"开始库位一致性检测,仓库: {self.warehouse_code}, 楼层: {self.layer}")
-
- # 检测库位状态与Link记录的一致性
- self.check_location_link_consistency()
-
- # 检测托盘明细状态一致性
- self.check_container_detail_status()
-
- # 检测库位组状态一致性
- self.check_group_consistency()
-
- # 如果启用自动修复,执行修复
- if self.auto_fix:
- self.fix_detected_issues()
-
- logger.info(f"库位一致性检测完成,发现{self.results['summary']['error_locations']}个库位问题,"
- f"{self.results['summary']['error_groups']}个库位组问题,"
- f"{self.results['summary']['error_details']}条托盘明细问题")
-
- return self.results
-
- def check_location_link_consistency(self):
- """检测库位状态与Link记录的一致性"""
- from bin.models import LocationModel, LocationContainerLink
-
- # 构建查询条件
- filters = {'is_active': True, 'location_type__in': self.TARGET_LOCATION_TYPES}
- if self.warehouse_code:
- filters['warehouse_code'] = self.warehouse_code
- if self.layer is not None:
- filters['layer'] = self.layer
-
- # 获取库位并预取Link记录
- locations = LocationModel.objects.filter(**filters).prefetch_related('container_links')
- self.results['summary']['total_locations'] = locations.count()
-
- for location in locations:
- self.results['summary']['checked_locations'] += 1
-
- # 获取该库位的活跃Link记录数量
- active_links_count = location.container_links.filter(is_active=True).count()
-
- # 根据状态判断是否一致
- is_consistent, expected_status, error_type = self._check_location_consistency(
- location.status, active_links_count
- )
-
- if not is_consistent:
- self.results['summary']['error_locations'] += 1
- self.results['location_errors'].append({
- 'location_id': location.id,
- 'location_code': location.location_code,
- 'warehouse_code': location.warehouse_code,
- 'current_status': location.status,
- 'expected_status': expected_status,
- 'active_links_count': active_links_count,
- 'layer': location.layer,
- 'row': location.row,
- 'col': location.col,
- 'location_group': location.location_group,
- 'error_type': error_type,
- 'detected_at': timezone.now()
- })
-
- def _check_location_consistency(self, current_status, active_links_count):
- """
- 检查单个库位的状态一致性
-
- Returns:
- tuple: (是否一致, 预期状态, 错误类型)
- """
- if current_status == 'available':
- # available状态应该没有活跃的Link记录
- if active_links_count > 0:
- return False, 'occupied', 'available_but_has_links'
-
- elif current_status in ['occupied','reserved']:
- # occupied状态应该至少有一个活跃的Link记录
- if active_links_count != 1:
- return False, 'available', 'occupied_but_no_links'
-
- elif current_status in ['disabled', 'maintenance']:
- # 这些特殊状态可以有Link记录,但通常不应该有
- if active_links_count > 0:
- return False, current_status, 'special_status_has_links'
-
- return True, current_status, None
-
- def check_container_detail_status(self):
- """检测托盘明细状态的一致性"""
- from container.models import ContainerDetailModel
- from bin.models import LocationContainerLink
-
- details_qs = (
- ContainerDetailModel.objects.filter(is_delete=False)
- .select_related('container', 'batch')
- .prefetch_related(
- Prefetch(
- 'container__location_links',
- queryset=LocationContainerLink.objects.filter(is_active=True).select_related('location'),
- to_attr='active_links'
- )
- )
- )
-
- total_checked = 0
-
- for detail in details_qs:
- if not detail.container:
- continue
-
- if not self._detail_in_scope(detail):
- continue
-
- goods_qty = self._to_decimal(detail.goods_qty)
- goods_out_qty = self._to_decimal(detail.goods_out_qty)
- remaining_qty = goods_qty - goods_out_qty
- expected_status, error_type = self._determine_detail_status(remaining_qty)
-
- total_checked += 1
- self.results['summary']['checked_details'] += 1
-
- if expected_status is None:
- continue
-
- if detail.status != expected_status:
- self.results['summary']['error_details'] += 1
- self.results['detail_errors'].append({
- 'detail_id': detail.id,
- 'container_id': detail.container_id,
- 'container_code': getattr(detail.container, 'container_code', None),
- 'batch_id': detail.batch_id,
- 'batch_number': getattr(detail.batch, 'bound_number', None) if detail.batch else None,
- 'goods_qty': str(goods_qty),
- 'goods_out_qty': str(goods_out_qty),
- 'remaining_qty': str(remaining_qty),
- 'current_status': detail.status,
- 'current_status_display': self._status_display(detail.status),
- 'expected_status': expected_status,
- 'expected_status_display': self._status_display(expected_status),
- 'error_type': error_type,
- 'detected_at': timezone.now()
- })
-
- self.results['summary']['total_details'] = total_checked
-
- def _detail_in_scope(self, detail):
- """判断托盘明细是否在当前检测范围内"""
- if not (self.warehouse_code or self.layer):
- return True
-
- active_links = getattr(detail.container, 'active_links', None)
- if not active_links:
- return False
-
- warehouse_ok = True
- layer_ok = True
-
- if self.warehouse_code:
- warehouse_ok = any(link.location.warehouse_code == self.warehouse_code for link in active_links)
-
- if self.layer is not None:
- layer_ok = any(link.location.layer == self.layer for link in active_links)
-
- return warehouse_ok and layer_ok
-
- def _to_decimal(self, value):
- if isinstance(value, Decimal):
- return value
- if value is None:
- return Decimal('0')
- try:
- return Decimal(str(value))
- except (InvalidOperation, TypeError, ValueError):
- return Decimal('0')
-
- def _determine_detail_status(self, remaining_qty):
- if remaining_qty > 0:
- return 2, 'detail_should_be_in_stock'
- return 3, 'detail_should_be_outbound'
-
- def _status_display(self, status):
- status_map = {
- 0: '空盘',
- 2: '在盘',
- 3: '离库'
- }
- return status_map.get(status, str(status) if status is not None else '未知')
-
- def check_group_consistency(self):
- """检测库位组状态的一致性"""
- from bin.models import LocationGroupModel
-
- # 构建查询条件
- filters = {'is_active': True}
- if self.warehouse_code:
- filters['warehouse_code'] = self.warehouse_code
- if self.layer is not None:
- filters['layer'] = self.layer
-
- # 获取库位组并预取关联的库位
- groups = LocationGroupModel.objects.filter(**filters).prefetch_related('location_items')
- self.results['summary']['total_groups'] = groups.count()
-
- for group in groups:
- self.results['summary']['checked_groups'] += 1
-
- # 获取组内所有库位
- group_locations = group.location_items.filter(is_active=True)
-
- # 统计组内库位的状态分布
- status_counts = self._get_group_status_distribution(group_locations)
- total_locations = group_locations.count()
-
- # 根据组内库位状态推断组应该的状态
- is_consistent, expected_status, error_type = self._check_group_consistency(
- group.status, status_counts, total_locations
- )
-
- if not is_consistent:
- self.results['summary']['error_groups'] += 1
- self.results['group_errors'].append({
- 'group_id': group.id,
- 'group_code': group.group_code,
- 'group_name': group.group_name,
- 'warehouse_code': group.warehouse_code,
- 'current_status': group.status,
- 'expected_status': expected_status,
- 'total_locations': total_locations,
- 'status_distribution': status_counts,
- 'layer': group.layer,
- 'error_type': error_type,
- 'detected_at': timezone.now()
- })
-
- def _get_group_status_distribution(self, group_locations):
- """获取组内库位状态分布"""
- return group_locations.aggregate(
- available_count=Count('id', filter=Q(status='available')),
- occupied_count=Count('id', filter=Q(status='occupied')),
- disabled_count=Count('id', filter=Q(status='disabled')),
- reserved_count=Count('id', filter=Q(status='reserved')),
- maintenance_count=Count('id', filter=Q(status='maintenance'))
- )
-
- def _check_group_consistency(self, current_status, status_counts, total_locations):
- """
- 检查库位组状态一致性
-
- Returns:
- tuple: (是否一致, 预期状态, 错误类型)
- """
- if total_locations == 0:
- # 空组应该是available或disabled
- if current_status not in ['available', 'disabled']:
- return False, 'available', 'empty_group_wrong_status'
- else:
- # 根据库位状态分布推断组状态
- if status_counts['occupied_count'] == total_locations:
- # 所有库位都被占用,组状态应该是full
- if current_status != 'full':
- return False, 'full', 'all_occupied_but_not_full'
-
- elif status_counts['occupied_count'] > 0:
- # 有库位被占用,组状态应该是occupied
- if current_status != 'occupied':
- return False, 'occupied', 'has_occupied_but_wrong_status'
-
- elif status_counts['available_count'] == total_locations:
- # 所有库位都可用,组状态应该是available
- if current_status != 'available':
- return False, 'available', 'all_available_but_wrong_status'
-
- # 检查特殊状态
- elif status_counts['disabled_count'] > 0 and current_status != 'disabled':
- return False, 'disabled', 'has_disabled_but_wrong_status'
-
- elif status_counts['maintenance_count'] > 0 and current_status != 'maintenance':
- return False, 'maintenance', 'has_maintenance_but_wrong_status'
-
- return True, current_status, None
-
- def fix_detected_issues(self):
- """修复检测到的不一致问题"""
- logger.info("开始修复检测到的不一致问题")
-
- # 修复库位状态不一致
- if 'locations' in self.fix_scope and self.results['location_errors']:
- self._fix_location_issues()
-
- # 修复托盘明细状态不一致
- if 'details' in self.fix_scope and self.results['detail_errors']:
- self._fix_detail_issues()
-
- # 修复库位组状态不一致
- if 'groups' in self.fix_scope and self.results['group_errors']:
- self._fix_group_issues()
-
- logger.info(f"修复完成: {self.results['summary']['fixed_location_count']}个库位, "
- f"{self.results['summary']['fixed_group_count']}个库位组, "
- f"{self.results['summary']['fixed_detail_count']}条托盘明细")
-
- def _fix_location_issues(self):
- """修复库位状态不一致问题"""
- from bin.models import LocationModel
-
- for error in self.results['location_errors']:
- try:
- with transaction.atomic():
- location = LocationModel.objects.select_for_update().get(id=error['location_id'])
-
- # 只有明确有预期状态时才修复
- if error['expected_status'] and error['expected_status'] != 'need_check':
- old_status = location.status
- location.status = error['expected_status']
- location.save()
-
- self.results['summary']['fixed_location_count'] += 1
-
- self.results['fixed_locations'].append({
- 'location_id': location.id,
- 'location_code': location.location_code,
- 'old_status': old_status,
- 'new_status': location.status,
- 'fixed_at': timezone.now(),
- 'error_type': error['error_type']
- })
- else:
- # 标记为需要手动检查
- self.results['fixed_locations'].append({
- 'location_id': location.id,
- 'location_code': location.location_code,
- 'status': '需要手动检查',
- 'reason': '无法自动确定正确状态',
- 'error_type': error['error_type']
- })
-
- except Exception as e:
- logger.error(f"修复库位{error.get('location_id')}时出错: {str(e)}")
- self.results['repair_errors'].append({
- 'type': 'location_fix_error',
- 'location_id': error.get('location_id'),
- 'error_message': str(e),
- 'error_type': error.get('error_type')
- })
-
- def _fix_group_issues(self):
- """修复库位组状态不一致问题"""
- from bin.models import LocationGroupModel
-
- for error in self.results['group_errors']:
- try:
- with transaction.atomic():
- group = LocationGroupModel.objects.select_for_update().get(id=error['group_id'])
-
- # 只有明确有预期状态时才修复
- if error['expected_status'] and error['expected_status'] != 'need_check':
- old_status = group.status
- group.status = error['expected_status']
- group.save()
-
- self.results['summary']['fixed_group_count'] += 1
-
- self.results['fixed_groups'].append({
- 'group_id': group.id,
- 'group_code': group.group_code,
- 'old_status': old_status,
- 'new_status': group.status,
- 'fixed_at': timezone.now(),
- 'error_type': error['error_type']
- })
- else:
- # 标记为需要手动检查
- self.results['fixed_groups'].append({
- 'group_id': group.id,
- 'group_code': group.group_code,
- 'status': '需要手动检查',
- 'reason': '无法自动确定正确状态',
- 'error_type': error.get('error_type')
- })
-
- except Exception as e:
- logger.error(f"修复库位组{error.get('group_id')}时出错: {str(e)}")
- self.results['repair_errors'].append({
- 'type': 'group_fix_error',
- 'group_id': error.get('group_id'),
- 'error_message': str(e),
- 'error_type': error.get('error_type')
- })
-
- def _fix_detail_issues(self):
- """修复托盘明细状态不一致问题"""
- from container.models import ContainerDetailModel
-
- for error in self.results['detail_errors']:
- expected_status = error.get('expected_status')
- if expected_status not in [2, 3]:
- continue
-
- try:
- with transaction.atomic():
- detail = ContainerDetailModel.objects.select_for_update().select_related('container').get(id=error['detail_id'])
- if detail.status == expected_status:
- continue
-
- old_status = detail.status
- detail.status = expected_status
- detail.save(update_fields=['status'])
-
- self.results['summary']['fixed_detail_count'] += 1
- error['fixed'] = True
- error['fixed_at'] = timezone.now()
- error['new_status'] = expected_status
- error['new_status_display'] = self._status_display(expected_status)
- self.results['fixed_details'].append({
- 'detail_id': detail.id,
- 'container_code': getattr(detail.container, 'container_code', None),
- 'old_status': old_status,
- 'new_status': expected_status,
- 'fixed_at': timezone.now(),
- 'error_type': error.get('error_type')
- })
- except ContainerDetailModel.DoesNotExist:
- logger.warning(f"托盘明细 {error.get('detail_id')} 不存在,跳过修复")
- except Exception as e:
- logger.error(f"修复托盘明细{error.get('detail_id')}失败: {str(e)}")
- self.results['repair_errors'].append({
- 'type': 'detail_fix_error',
- 'detail_id': error.get('detail_id'),
- 'error_message': str(e),
- 'error_type': error.get('error_type')
- })
-
- def get_summary(self):
- """获取检测摘要"""
- return {
- 'check_time': self.results['check_time'],
- 'total_checked': {
- 'locations': self.results['summary']['checked_locations'],
- 'groups': self.results['summary']['checked_groups'],
- 'details': self.results['summary']['checked_details']
- },
- 'errors_found': {
- 'locations': self.results['summary']['error_locations'],
- 'groups': self.results['summary']['error_groups'],
- 'details': self.results['summary']['error_details']
- },
- 'fixed': {
- 'locations': self.results['summary']['fixed_location_count'],
- 'groups': self.results['summary']['fixed_group_count'],
- 'details': self.results['summary']['fixed_detail_count']
- },
- 'has_errors': len(self.results['repair_errors']) > 0
- }
-
- def generate_report(self):
- """生成检测报告"""
- summary = self.get_summary()
-
- report = {
- 'summary': summary,
- 'details': {
- 'location_errors': self.results['location_errors'],
- 'group_errors': self.results['group_errors'],
- 'detail_errors': self.results['detail_errors'],
- 'fixed_locations': self.results['fixed_locations'],
- 'fixed_groups': self.results['fixed_groups'],
- 'fixed_details': self.results['fixed_details'],
- 'repair_errors': self.results['repair_errors']
- }
- }
-
- return report
- # 便捷函数
- def check_location_consistency(warehouse_code=None, layer=None, auto_fix=False):
- """
- 便捷函数:检查库位一致性
-
- Args:
- warehouse_code: 仓库代码
- layer: 楼层
- auto_fix: 是否自动修复
-
- Returns:
- dict: 检测结果
- """
- checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix)
- return checker.check_all()
- def get_consistency_report(warehouse_code=None, layer=None, auto_fix=False):
- """
- 获取详细的检测报告
-
- Args:
- warehouse_code: 仓库代码
- layer: 楼层
- auto_fix: 是否自动修复
-
- Returns:
- dict: 检测报告
- """
- checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix)
- checker.check_all()
- return checker.generate_report()
|