| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088 |
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from django.utils import timezone
- from datetime import timedelta
- from .services import LocationStatisticsService
- from .models import LocationGroupStatistics
- from django.db import transaction
- from django.db.models import Count, Q, Prefetch
- from decimal import Decimal, InvalidOperation
- import logging
- logger = logging.getLogger(__name__)
- from .serializers import LocationStatisticsSerializer, LocationGroupStatisticsSerializer
- class LocationStatisticsView(APIView):
- """货位统计API视图"""
-
- def get(self, request):
- """获取货位统计信息"""
- warehouse_code = request.GET.get('warehouse_code')
- layer = request.GET.get('layer')
-
- # 尝试从参数转换层数
- if layer is not None:
- try:
- layer = int(layer)
- except ValueError:
- layer = None
-
- # 从数据库获取最新统计
- statistics = LocationStatisticsService.get_latest_statistics(warehouse_code, layer)
- serializer = LocationStatisticsSerializer(statistics, many=True)
-
- response_data = {
- 'timestamp': timezone.now().isoformat(),
- 'data': serializer.data
- }
-
- return Response(response_data)
-
- def post(self, request):
- """手动触发统计计算"""
- warehouse_code = request.data.get('warehouse_code')
- layer = request.data.get('layer')
-
- # 计算统计信息
- stats_data = LocationStatisticsService.calculate_location_statistics(warehouse_code, layer)
-
- # 保存到数据库
- count = LocationStatisticsService.save_statistics_to_db(stats_data)
-
-
- return Response({
- 'message': f'成功统计了{count}条记录',
- 'timestamp': timezone.now().isoformat()
- }, status=status.HTTP_201_CREATED)
- class LocationStatisticsHistoryView(APIView):
- """货位统计历史数据API视图"""
-
- def get(self, request):
- """获取历史统计信息"""
- warehouse_code = request.GET.get('warehouse_code')
- layer = request.GET.get('layer')
- hours = int(request.GET.get('hours', 24)) # 默认最近24小时
-
- end_time = timezone.now()
- start_time = end_time - timedelta(hours=hours)
-
- statistics = LocationStatisticsService.get_statistics_by_time_range(
- start_time, end_time, warehouse_code, layer
- )
-
- serializer = LocationStatisticsSerializer(statistics, many=True)
-
- return Response({
- 'period': f'{start_time} - {end_time}',
- 'data': serializer.data
- })
- class LocationGroupStatisticsView(APIView):
- """货位组统计API视图"""
-
- def get(self, request):
- """获取货位组统计信息"""
- warehouse_code = request.GET.get('warehouse_code')
- layer = request.GET.get('layer')
-
- # 新增过滤参数
- min_utilization = float(request.GET.get('min_utilization', 0)) # 最小使用率,默认0
- min_used_locations = int(request.GET.get('min_used_locations', 0)) # 最小已用货位数,默认0
-
- filters = {}
- if warehouse_code:
- filters['warehouse_code'] = warehouse_code
- if layer:
- try:
- filters['layer'] = int(layer)
- except ValueError:
- pass
-
- # 获取最新的组统计
- from django.db.models import Max
- latest_stats = LocationGroupStatistics.objects.filter(
- **filters
- ).values('warehouse_code', 'layer', 'location_group').annotate(
- latest_time=Max('statistic_time')
- )
-
- group_stats = []
- for stat in latest_stats:
- latest_record = LocationGroupStatistics.objects.filter(
- warehouse_code=stat['warehouse_code'],
- layer=stat['layer'],
- location_group=stat['location_group'],
- statistic_time=stat['latest_time']
- ).first()
-
- # 根据参数过滤
- if latest_record:
- # 检查使用率条件
- utilization_ok = latest_record.utilization_rate >= min_utilization
- # 检查已用货位数条件
- used_locations_ok = latest_record.used_locations >= min_used_locations
-
- if utilization_ok and used_locations_ok:
- group_stats.append(latest_record)
-
- serializer = LocationGroupStatisticsSerializer(group_stats, many=True)
-
- return Response({
- 'timestamp': timezone.now().isoformat(),
- 'filters': {
- 'min_utilization': min_utilization,
- 'min_used_locations': min_used_locations,
- 'total_records': len(group_stats)
-
- },
- 'data': serializer.data
- })
-
- class LocationConsistencyCheckView(APIView):
- """库位一致性检查API"""
-
- def post(self, request):
- warehouse_code = request.data.get('warehouse_code')
- layer_param = request.GET.get('layer')
- layer = None
- if layer_param is not None:
- try:
- layer_value = int(layer_param)
- if layer_value > 0:
- layer = layer_value
- except (TypeError, ValueError):
- layer = None
- auto_fix = self._to_bool(request.data.get('auto_fix', False))
- fix_scope = request.data.get('fix_scope')
-
- checker = LocationConsistencyChecker(
- warehouse_code,
- layer,
- auto_fix,
- fix_scope=fix_scope
- )
- checker.check_all()
-
- return Response({
- 'success': True,
- 'data': checker.generate_report(),
- 'auto_fix': auto_fix,
- 'fix_scope': checker.fix_scope
- })
- @staticmethod
- def _to_bool(value):
- if isinstance(value, bool):
- return value
- if isinstance(value, str):
- return value.strip().lower() in {'1', 'true', 'yes', 'y', 'on'}
- if isinstance(value, int):
- return value != 0
- return False
- class LocationConsistencyChecker:
- TARGET_LOCATION_TYPES = ['T5', 'T4', 'S4', 'T2', 'T1']
- """
- 库位一致性检测器
- 用于检测库位状态与Link记录的一致性以及库位组的状态一致性
- """
-
- def __init__(self, warehouse_code=None, layer=None, auto_fix=False, fix_scope=None):
- """
- 初始化检测器
-
- Args:
- warehouse_code: 指定仓库代码,如果为None则检测所有仓库
- layer: 指定楼层,如果为None则检测所有楼层
- auto_fix: 是否自动修复检测到的问题
- fix_scope: 修复范围(locations/details/groups),None表示全部
- """
- self.warehouse_code = warehouse_code
- self.layer = layer
- if self.layer is not None:
- try:
- layer_int = int(self.layer)
- self.layer = layer_int if layer_int > 0 else None
- except (TypeError, ValueError):
- self.layer = None
- self.auto_fix = auto_fix
- if isinstance(fix_scope, (list, tuple, set)):
- fix_scope = [scope for scope in fix_scope if scope in {'locations', 'groups', 'details'}]
- elif isinstance(fix_scope, str):
- if fix_scope == 'all':
- fix_scope = None
- elif fix_scope in {'locations', 'groups', 'details'}:
- fix_scope = [fix_scope]
- else:
- fix_scope = None
- else:
- fix_scope = None
- self.fix_scope = fix_scope or ['locations', 'groups', 'details']
- self.results = {
- 'check_time': timezone.now(),
- 'location_errors': [],
- 'group_errors': [],
- 'detail_errors': [],
- 'fixed_locations': [],
- 'fixed_groups': [],
- 'fixed_details': [],
- 'repair_errors': [],
- 'summary': {
- 'total_locations': 0,
- 'checked_locations': 0,
- 'error_locations': 0,
- 'total_groups': 0,
- 'checked_groups': 0,
- 'error_groups': 0,
- 'total_details': 0,
- 'checked_details': 0,
- 'error_details': 0,
- 'fixed_location_count': 0,
- 'fixed_group_count': 0,
- 'fixed_detail_count': 0
- }
- }
-
- def check_all(self):
- """执行所有检测"""
- logger.info(f"开始库位一致性检测,仓库: {self.warehouse_code}, 楼层: {self.layer}")
-
- # 检测库位状态与Link记录的一致性
- self.check_location_link_consistency()
-
- # 检测托盘明细状态一致性
- self.check_container_detail_status()
-
- # 检测库位组状态一致性
- self.check_group_consistency()
-
- # 如果启用自动修复,执行修复
- if self.auto_fix:
- self.fix_detected_issues()
-
- logger.info(f"库位一致性检测完成,发现{self.results['summary']['error_locations']}个库位问题,"
- f"{self.results['summary']['error_groups']}个库位组问题,"
- f"{self.results['summary']['error_details']}条托盘明细问题")
-
- return self.results
-
- def check_location_link_consistency(self):
- """检测库位状态与Link记录的一致性"""
- from bin.models import LocationModel, LocationContainerLink
-
- # 构建查询条件
- filters = {'is_active': True, 'location_type__in': self.TARGET_LOCATION_TYPES}
- if self.warehouse_code:
- filters['warehouse_code'] = self.warehouse_code
- if self.layer is not None:
- filters['layer'] = self.layer
-
- # 获取库位并预取Link记录
- locations = LocationModel.objects.filter(**filters).prefetch_related('container_links')
- self.results['summary']['total_locations'] = locations.count()
-
- for location in locations:
- self.results['summary']['checked_locations'] += 1
-
- # 获取该库位的活跃Link记录数量
- active_links_count = location.container_links.filter(is_active=True).count()
-
- # 根据状态判断是否一致
- is_consistent, expected_status, error_type = self._check_location_consistency(
- location.status, active_links_count
- )
-
- if not is_consistent:
- self.results['summary']['error_locations'] += 1
- self.results['location_errors'].append({
- 'location_id': location.id,
- 'location_code': location.location_code,
- 'warehouse_code': location.warehouse_code,
- 'current_status': location.status,
- 'expected_status': expected_status,
- 'active_links_count': active_links_count,
- 'layer': location.layer,
- 'row': location.row,
- 'col': location.col,
- 'location_group': location.location_group,
- 'error_type': error_type,
- 'detected_at': timezone.now()
- })
-
- def _check_location_consistency(self, current_status, active_links_count):
- """
- 检查单个库位的状态一致性
-
- Returns:
- tuple: (是否一致, 预期状态, 错误类型)
- """
- if current_status == 'available':
- # available状态应该没有活跃的Link记录
- if active_links_count > 0:
- return False, 'occupied', 'available_but_has_links'
-
- elif current_status in ['occupied','reserved']:
- # occupied状态应该至少有一个活跃的Link记录
- if active_links_count != 1:
- return False, 'available', 'occupied_but_no_links'
-
- elif current_status in ['disabled', 'maintenance']:
- # 这些特殊状态可以有Link记录,但通常不应该有
- if active_links_count > 0:
- return False, current_status, 'special_status_has_links'
-
- return True, current_status, None
-
- def check_container_detail_status(self):
- """检测托盘明细状态的一致性"""
- from container.models import ContainerDetailModel
- from bin.models import LocationContainerLink
-
- details_qs = (
- ContainerDetailModel.objects.filter(is_delete=False)
- .select_related('container', 'batch')
- .prefetch_related(
- Prefetch(
- 'container__location_links',
- queryset=LocationContainerLink.objects.filter(is_active=True).select_related('location'),
- to_attr='active_links'
- )
- )
- )
-
- total_checked = 0
-
- for detail in details_qs:
- if not detail.container:
- continue
-
- if not self._detail_in_scope(detail):
- continue
-
- goods_qty = self._to_decimal(detail.goods_qty)
- goods_out_qty = self._to_decimal(detail.goods_out_qty)
- remaining_qty = goods_qty - goods_out_qty
- expected_status, error_type = self._determine_detail_status(remaining_qty)
-
- total_checked += 1
- self.results['summary']['checked_details'] += 1
-
- if expected_status is None:
- continue
-
- if detail.status != expected_status:
- self.results['summary']['error_details'] += 1
- self.results['detail_errors'].append({
- 'detail_id': detail.id,
- 'container_id': detail.container_id,
- 'container_code': getattr(detail.container, 'container_code', None),
- 'batch_id': detail.batch_id,
- 'batch_number': getattr(detail.batch, 'bound_number', None) if detail.batch else None,
- 'goods_qty': str(goods_qty),
- 'goods_out_qty': str(goods_out_qty),
- 'remaining_qty': str(remaining_qty),
- 'current_status': detail.status,
- 'current_status_display': self._status_display(detail.status),
- 'expected_status': expected_status,
- 'expected_status_display': self._status_display(expected_status),
- 'error_type': error_type,
- 'detected_at': timezone.now()
- })
-
- self.results['summary']['total_details'] = total_checked
-
- def _detail_in_scope(self, detail):
- """判断托盘明细是否在当前检测范围内"""
- if not (self.warehouse_code or self.layer):
- return True
-
- active_links = getattr(detail.container, 'active_links', None)
- if not active_links:
- return False
-
- warehouse_ok = True
- layer_ok = True
-
- if self.warehouse_code:
- warehouse_ok = any(link.location.warehouse_code == self.warehouse_code for link in active_links)
-
- if self.layer is not None:
- layer_ok = any(link.location.layer == self.layer for link in active_links)
-
- return warehouse_ok and layer_ok
-
- def _to_decimal(self, value):
- if isinstance(value, Decimal):
- return value
- if value is None:
- return Decimal('0')
- try:
- return Decimal(str(value))
- except (InvalidOperation, TypeError, ValueError):
- return Decimal('0')
-
- def _determine_detail_status(self, remaining_qty):
- if remaining_qty > 0:
- return 2, 'detail_should_be_in_stock'
- return 3, 'detail_should_be_outbound'
-
- def _status_display(self, status):
- status_map = {
- 0: '空盘',
- 2: '在盘',
- 3: '离库'
- }
- return status_map.get(status, str(status) if status is not None else '未知')
-
- def check_group_consistency(self):
- """检测库位组状态的一致性"""
- from bin.models import LocationGroupModel
-
- # 构建查询条件
- filters = {'is_active': True}
- if self.warehouse_code:
- filters['warehouse_code'] = self.warehouse_code
- if self.layer is not None:
- filters['layer'] = self.layer
-
- # 获取库位组并预取关联的库位
- groups = LocationGroupModel.objects.filter(**filters).prefetch_related('location_items')
- self.results['summary']['total_groups'] = groups.count()
-
- for group in groups:
- self.results['summary']['checked_groups'] += 1
-
- # 获取组内所有库位
- group_locations = group.location_items.filter(is_active=True)
-
- # 统计组内库位的状态分布
- status_counts = self._get_group_status_distribution(group_locations)
- total_locations = group_locations.count()
-
- # 根据组内库位状态推断组应该的状态
- is_consistent, expected_status, error_type = self._check_group_consistency(
- group.status, status_counts, total_locations
- )
-
- if not is_consistent:
- self.results['summary']['error_groups'] += 1
- self.results['group_errors'].append({
- 'group_id': group.id,
- 'group_code': group.group_code,
- 'group_name': group.group_name,
- 'warehouse_code': group.warehouse_code,
- 'current_status': group.status,
- 'expected_status': expected_status,
- 'total_locations': total_locations,
- 'status_distribution': status_counts,
- 'layer': group.layer,
- 'error_type': error_type,
- 'detected_at': timezone.now()
- })
-
- def _get_group_status_distribution(self, group_locations):
- """获取组内库位状态分布"""
- return group_locations.aggregate(
- available_count=Count('id', filter=Q(status='available')),
- occupied_count=Count('id', filter=Q(status='occupied')),
- disabled_count=Count('id', filter=Q(status='disabled')),
- reserved_count=Count('id', filter=Q(status='reserved')),
- maintenance_count=Count('id', filter=Q(status='maintenance'))
- )
-
- def _check_group_consistency(self, current_status, status_counts, total_locations):
- """
- 检查库位组状态一致性
-
- Returns:
- tuple: (是否一致, 预期状态, 错误类型)
- """
- if total_locations == 0:
- # 空组应该是available或disabled
- if current_status not in ['available', 'disabled']:
- return False, 'available', 'empty_group_wrong_status'
- else:
- # 根据库位状态分布推断组状态
- if status_counts['occupied_count'] == total_locations:
- # 所有库位都被占用,组状态应该是full
- if current_status != 'full':
- return False, 'full', 'all_occupied_but_not_full'
-
- elif status_counts['occupied_count'] > 0:
- # 有库位被占用,组状态应该是occupied
- if current_status != 'occupied':
- return False, 'occupied', 'has_occupied_but_wrong_status'
-
- elif status_counts['available_count'] == total_locations:
- # 所有库位都可用,组状态应该是available
- if current_status != 'available':
- return False, 'available', 'all_available_but_wrong_status'
-
- # 检查特殊状态
- elif status_counts['disabled_count'] > 0 and current_status != 'disabled':
- return False, 'disabled', 'has_disabled_but_wrong_status'
-
- elif status_counts['maintenance_count'] > 0 and current_status != 'maintenance':
- return False, 'maintenance', 'has_maintenance_but_wrong_status'
-
- return True, current_status, None
-
- def fix_detected_issues(self):
- """修复检测到的不一致问题"""
- logger.info("开始修复检测到的不一致问题")
-
- # 修复库位状态不一致
- if 'locations' in self.fix_scope and self.results['location_errors']:
- self._fix_location_issues()
-
- # 修复托盘明细状态不一致
- if 'details' in self.fix_scope and self.results['detail_errors']:
- self._fix_detail_issues()
-
- # 修复库位组状态不一致
- if 'groups' in self.fix_scope and self.results['group_errors']:
- self._fix_group_issues()
-
- logger.info(f"修复完成: {self.results['summary']['fixed_location_count']}个库位, "
- f"{self.results['summary']['fixed_group_count']}个库位组, "
- f"{self.results['summary']['fixed_detail_count']}条托盘明细")
-
- def _fix_location_issues(self):
- """修复库位状态不一致问题"""
- from bin.models import LocationModel
-
- for error in self.results['location_errors']:
- try:
- with transaction.atomic():
- location = LocationModel.objects.select_for_update().get(id=error['location_id'])
-
- # 只有明确有预期状态时才修复
- if error['expected_status'] and error['expected_status'] != 'need_check':
- old_status = location.status
- location.status = error['expected_status']
- location.save()
-
- self.results['summary']['fixed_location_count'] += 1
-
- self.results['fixed_locations'].append({
- 'location_id': location.id,
- 'location_code': location.location_code,
- 'old_status': old_status,
- 'new_status': location.status,
- 'fixed_at': timezone.now(),
- 'error_type': error['error_type']
- })
- else:
- # 标记为需要手动检查
- self.results['fixed_locations'].append({
- 'location_id': location.id,
- 'location_code': location.location_code,
- 'status': '需要手动检查',
- 'reason': '无法自动确定正确状态',
- 'error_type': error['error_type']
- })
-
- except Exception as e:
- logger.error(f"修复库位{error.get('location_id')}时出错: {str(e)}")
- self.results['repair_errors'].append({
- 'type': 'location_fix_error',
- 'location_id': error.get('location_id'),
- 'error_message': str(e),
- 'error_type': error.get('error_type')
- })
-
- def _fix_group_issues(self):
- """修复库位组状态不一致问题"""
- from bin.models import LocationGroupModel
-
- for error in self.results['group_errors']:
- try:
- with transaction.atomic():
- group = LocationGroupModel.objects.select_for_update().get(id=error['group_id'])
-
- # 只有明确有预期状态时才修复
- if error['expected_status'] and error['expected_status'] != 'need_check':
- old_status = group.status
- group.status = error['expected_status']
- group.save()
-
- self.results['summary']['fixed_group_count'] += 1
-
- self.results['fixed_groups'].append({
- 'group_id': group.id,
- 'group_code': group.group_code,
- 'old_status': old_status,
- 'new_status': group.status,
- 'fixed_at': timezone.now(),
- 'error_type': error['error_type']
- })
- else:
- # 标记为需要手动检查
- self.results['fixed_groups'].append({
- 'group_id': group.id,
- 'group_code': group.group_code,
- 'status': '需要手动检查',
- 'reason': '无法自动确定正确状态',
- 'error_type': error.get('error_type')
- })
-
- except Exception as e:
- logger.error(f"修复库位组{error.get('group_id')}时出错: {str(e)}")
- self.results['repair_errors'].append({
- 'type': 'group_fix_error',
- 'group_id': error.get('group_id'),
- 'error_message': str(e),
- 'error_type': error.get('error_type')
- })
-
- def _fix_detail_issues(self):
- """修复托盘明细状态不一致问题"""
- from container.models import ContainerDetailModel
-
- for error in self.results['detail_errors']:
- expected_status = error.get('expected_status')
- if expected_status not in [2, 3]:
- continue
-
- try:
- with transaction.atomic():
- detail = ContainerDetailModel.objects.select_for_update().select_related('container').get(id=error['detail_id'])
- if detail.status == expected_status:
- continue
-
- old_status = detail.status
- detail.status = expected_status
- detail.save(update_fields=['status'])
-
- self.results['summary']['fixed_detail_count'] += 1
- error['fixed'] = True
- error['fixed_at'] = timezone.now()
- error['new_status'] = expected_status
- error['new_status_display'] = self._status_display(expected_status)
- self.results['fixed_details'].append({
- 'detail_id': detail.id,
- 'container_code': getattr(detail.container, 'container_code', None),
- 'old_status': old_status,
- 'new_status': expected_status,
- 'fixed_at': timezone.now(),
- 'error_type': error.get('error_type')
- })
- except ContainerDetailModel.DoesNotExist:
- logger.warning(f"托盘明细 {error.get('detail_id')} 不存在,跳过修复")
- except Exception as e:
- logger.error(f"修复托盘明细{error.get('detail_id')}失败: {str(e)}")
- self.results['repair_errors'].append({
- 'type': 'detail_fix_error',
- 'detail_id': error.get('detail_id'),
- 'error_message': str(e),
- 'error_type': error.get('error_type')
- })
-
- def get_summary(self):
- """获取检测摘要"""
- return {
- 'check_time': self.results['check_time'],
- 'total_checked': {
- 'locations': self.results['summary']['checked_locations'],
- 'groups': self.results['summary']['checked_groups'],
- 'details': self.results['summary']['checked_details']
- },
- 'errors_found': {
- 'locations': self.results['summary']['error_locations'],
- 'groups': self.results['summary']['error_groups'],
- 'details': self.results['summary']['error_details']
- },
- 'fixed': {
- 'locations': self.results['summary']['fixed_location_count'],
- 'groups': self.results['summary']['fixed_group_count'],
- 'details': self.results['summary']['fixed_detail_count']
- },
- 'has_errors': len(self.results['repair_errors']) > 0
- }
-
- def generate_report(self):
- """生成检测报告"""
- summary = self.get_summary()
-
- report = {
- 'summary': summary,
- 'details': {
- 'location_errors': self.results['location_errors'],
- 'group_errors': self.results['group_errors'],
- 'detail_errors': self.results['detail_errors'],
- 'fixed_locations': self.results['fixed_locations'],
- 'fixed_groups': self.results['fixed_groups'],
- 'fixed_details': self.results['fixed_details'],
- 'repair_errors': self.results['repair_errors']
- }
- }
-
- return report
- # 便捷函数
- def check_location_consistency(warehouse_code=None, layer=None, auto_fix=False):
- """
- 便捷函数:检查库位一致性
-
- Args:
- warehouse_code: 仓库代码
- layer: 楼层
- auto_fix: 是否自动修复
-
- Returns:
- dict: 检测结果
- """
- checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix)
- return checker.check_all()
- def get_consistency_report(warehouse_code=None, layer=None, auto_fix=False):
- """
- 获取详细的检测报告
-
- Args:
- warehouse_code: 仓库代码
- layer: 楼层
- auto_fix: 是否自动修复
-
- Returns:
- dict: 检测报告
- """
- checker = LocationConsistencyChecker(warehouse_code, layer, auto_fix)
- checker.check_all()
- return checker.generate_report()
- class BindContainerView(APIView):
- """重新绑定托盘到库位"""
-
- def post(self, request):
- location_code = request.data.get('location_code')
- container_code = request.data.get('container_code')
-
- if not location_code:
- return Response({
- 'success': False,
- 'message': '缺少库位编码参数'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- if not container_code:
- return Response({
- 'success': False,
- 'message': '缺少托盘编码参数'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- try:
- from bin.models import LocationModel, LocationContainerLink
- from container.models import ContainerListModel
- from bin.views import LocationAllocation
- if len(location_code.split('-')) == 4:
- location_row = location_code.split('-')[1]
- location_col = location_code.split('-')[2]
- location_layer = location_code.split('-')[3]
- location = LocationModel.objects.filter(
- row=location_row,
- col=location_col,
- layer=location_layer,
- is_active=True).first()
- else:
- location = LocationModel.objects.filter(
- location_code=location_code,
- is_active=True
- ).first()
-
- if not location:
- return Response({
- 'success': False,
- 'message': f'库位 {location_code} 不存在或已禁用'
- }, status=status.HTTP_404_NOT_FOUND)
-
- # 验证托盘是否存在
- container = ContainerListModel.objects.filter(
- container_code=container_code
- ).first()
-
- if not container:
- return Response({
- 'success': False,
- 'message': f'托盘 {container_code} 不存在'
- }, status=status.HTTP_404_NOT_FOUND)
-
- # 使用 LocationAllocation 的方法来更新关联
- allocator = LocationAllocation()
-
- # 先解除该库位的所有现有关联
- LocationContainerLink.objects.filter(
- location=location,
- is_active=True
- ).update(is_active=False)
-
- # 创建新的关联
- with transaction.atomic():
- # 检查是否已存在关联(即使是非活跃的)
- existing_link = LocationContainerLink.objects.filter(
- location=location,
- container=container
- ).first()
-
- if existing_link:
- # 如果存在,重新激活
- existing_link.is_active = True
- existing_link.save()
- else:
- # 创建新关联
- LocationContainerLink.objects.create(
- location=location,
- container=container,
- is_active=True,
- operator=request.auth.name if request.auth else None,
-
- )
-
- # 更新库位状态为占用
- location.status = 'occupied'
- location.save()
-
- # 更新库位组状态
- allocator.update_location_group_status(location_code)
-
- logger.info(f"成功重新绑定托盘 {container_code} 到库位 {location_code}")
-
- return Response({
- 'success': True,
- 'code': '200',
- 'message': '重新绑定托盘成功',
- 'data': {
- 'location_code': location_code,
- 'container_code': container_code,
- 'status': 'occupied'
- }
- }, status=status.HTTP_200_OK)
-
- except Exception as e:
- logger.error(f"重新绑定托盘失败: {str(e)}", exc_info=True)
- return Response({
- 'success': False,
- 'message': f'重新绑定托盘失败: {str(e)}'
- }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- class UpdateLocationStatusView(APIView):
- """更新库位状态"""
-
- def post(self, request):
- location_code = request.data.get('location_code')
- location_status = request.data.get('status') # 重命名变量避免与 status 模块冲突
-
- if not location_code:
- return Response({
- 'success': False,
- 'message': '缺少库位编码参数'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- if not location_status:
- return Response({
- 'success': False,
- 'message': '缺少状态参数'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- # 验证状态值
- valid_statuses = ['available', 'occupied', 'disabled', 'reserved', 'maintenance']
- if location_status not in valid_statuses:
- return Response({
- 'success': False,
- 'message': f'无效的状态值,允许的值: {", ".join(valid_statuses)}'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- try:
- from bin.models import LocationModel
- from bin.views import LocationAllocation
-
- # 验证库位是否存在
- location = LocationModel.objects.filter(
- location_code=location_code,
- is_active=True
- ).first()
-
- if not location:
- return Response({
- 'success': False,
- 'message': f'库位 {location_code} 不存在或已禁用'
- }, status=status.HTTP_404_NOT_FOUND)
-
- # 保存旧状态
- old_status = location.status
-
- # 使用 LocationAllocation 的方法来更新状态
- allocator = LocationAllocation()
- result = allocator.update_location_status(location_code, location_status, request=request)
-
- if result:
- logger.info(f"成功更新库位 {location_code} 状态为 {location_status}")
-
- return Response({
- 'success': True,
- 'code': '200',
- 'message': '更新库位状态成功',
- 'data': {
- 'location_code': location_code,
- 'old_status': old_status,
- 'new_status': location_status
- }
- }, status=status.HTTP_200_OK)
- else:
- return Response({
- 'success': False,
- 'message': '更新库位状态失败'
- }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
-
- except Exception as e:
- logger.error(f"更新库位状态失败: {str(e)}", exc_info=True)
- return Response({
- 'success': False,
- 'message': f'更新库位状态失败: {str(e)}'
- }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- class UnbindContainerView(APIView):
- """解除托盘与库位的绑定关系"""
-
- def post(self, request):
- location_code = request.data.get('location_code')
- container_code = request.data.get('container_code') # 可选参数
-
- if not location_code:
- return Response({
- 'success': False,
- 'message': '缺少库位编码参数'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- try:
- from bin.models import LocationModel, LocationContainerLink
- from bin.views import LocationAllocation
-
- # 验证库位是否存在
- location = LocationModel.objects.filter(
- location_code=location_code,
- is_active=True
- ).first()
-
- if not location:
- return Response({
- 'success': False,
- 'message': f'库位 {location_code} 不存在或已禁用'
- }, status=status.HTTP_404_NOT_FOUND)
-
- # 根据是否提供 container_code 来决定解除范围
- if container_code:
- # 只解除指定托盘的绑定
- from container.models import ContainerListModel
- container = ContainerListModel.objects.filter(
- container_code=container_code,
- is_delete=False
- ).first()
-
- if not container:
- return Response({
- 'success': False,
- 'message': f'托盘 {container_code} 不存在或已删除'
- }, status=status.HTTP_404_NOT_FOUND)
-
- # 获取该库位与该托盘的关联
- active_links = LocationContainerLink.objects.filter(
- location=location,
- container=container,
- is_active=True
- )
-
- if not active_links.exists():
- return Response({
- 'success': False,
- 'message': f'库位 {location_code} 与托盘 {container_code} 没有绑定关系'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- unbound_count = active_links.count()
-
- with transaction.atomic():
- # 标记该关联为非活跃
- active_links.update(is_active=False)
-
- # 检查是否还有其他活跃关联
- remaining_links = LocationContainerLink.objects.filter(
- location=location,
- is_active=True
- )
-
- # 如果没有其他活跃关联,更新库位状态为可用
- if not remaining_links.exists():
- location.status = 'available'
- location.save()
-
- # 更新库位组状态
- allocator = LocationAllocation()
- allocator.update_location_group_status(location_code)
-
- logger.info(f"成功解除库位 {location_code} 与托盘 {container_code} 的绑定")
-
- return Response({
- 'success': True,
- 'code': '200',
- 'message': f'解除托盘 {container_code} 绑定成功',
- 'data': {
- 'location_code': location_code,
- 'container_code': container_code,
- 'unbound_count': unbound_count,
- 'remaining_links': remaining_links.count()
- }
- }, status=status.HTTP_200_OK)
- else:
- # 解除所有关联
- active_links = LocationContainerLink.objects.filter(
- location=location,
- is_active=True
- )
-
- if not active_links.exists():
- return Response({
- 'success': False,
- 'message': f'库位 {location_code} 没有绑定的托盘'
- }, status=status.HTTP_400_BAD_REQUEST)
-
- unbound_count = active_links.count()
-
- with transaction.atomic():
- # 标记所有关联为非活跃
- active_links.update(is_active=False)
-
- # 更新库位状态为可用
- location.status = 'available'
- location.save()
-
- # 更新库位组状态
- allocator = LocationAllocation()
- allocator.update_location_group_status(location_code)
-
- logger.info(f"成功解除库位 {location_code} 的所有托盘绑定")
-
- return Response({
- 'success': True,
- 'code': '200',
- 'message': '解除所有托盘绑定成功',
- 'data': {
- 'location_code': location_code,
- 'unbound_count': unbound_count,
- 'new_status': 'available'
- }
- }, status=status.HTTP_200_OK)
-
- except Exception as e:
- logger.error(f"解除托盘绑定失败: {str(e)}", exc_info=True)
- return Response({
- 'success': False,
- 'message': f'解除托盘绑定失败: {str(e)}'
- }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|