from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_POST from .backup_utils import perform_base_backup, restore_to_base_backup import os import json import logging from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from django.conf import settings import math from operation_log.views import log_success_operation, log_failure_operation from operation_log.models import OperationLog from django.utils import timezone import threading import sys import signal import platform import subprocess logger = logging.getLogger(__name__) # 初始化调度器 scheduler = BackgroundScheduler() def scheduled_backup(): """定时备份任务""" try: backup_path = perform_base_backup() logger.info(f"定时备份完成: {backup_path}") # 记录操作日志(定时任务没有request对象,直接创建日志) try: OperationLog.objects.create( operator="系统定时任务", operation_content=f"定时备份数据库完成,备份路径: {backup_path}", operation_level="other", operation_result="success", ip_address=None, user_agent="系统定时任务", request_method="CRON", request_path="/backup/scheduled", module_name="系统备份" ) except Exception as log_error: logger.error(f"定时备份日志记录失败: {str(log_error)}") # 更新托盘分类任务(如果存在) try: from container.utils import update_container_categories_task,reconcile_material_history update_container_categories_task() reconcile_material_history() logger.info(f"定时更新托盘分类完成") except ImportError: logger.warning("更新托盘分类模块未找到,跳过更新") except Exception as e: logger.error(f"定时备份失败: {str(e)}") # 记录失败日志 try: OperationLog.objects.create( operator="系统定时任务", operation_content=f"定时备份数据库失败: {str(e)}", operation_level="other", operation_result="failure", ip_address=None, user_agent="系统定时任务", request_method="CRON", request_path="/backup/scheduled", module_name="系统备份" ) except Exception as log_error: logger.error(f"定时备份失败日志记录失败: {str(log_error)}") def scheduled_consistency_fix(): """定时修复分组状态和托盘数一致性任务""" try: from location_statistics.views import LocationConsistencyChecker logger.info("开始执行定时一致性修复任务") fixed_groups = 0 error_groups = 0 fixed_details = 0 error_details = 0 group_fix_success = False detail_fix_success = False # 执行分组状态修复 try: group_checker = LocationConsistencyChecker( warehouse_code=None, # None表示检查所有仓库 layer=None, # None表示检查所有楼层 auto_fix=True, # 启用自动修复 fix_scope=['groups'] # 只修复分组状态 ) group_checker.check_all() group_report = group_checker.generate_report() fixed_groups = group_report['summary']['fixed']['groups'] error_groups = group_report['summary']['errors_found']['groups'] group_fix_success = True logger.info(f"分组状态修复完成: 发现{error_groups}个问题,修复{fixed_groups}个") except Exception as group_error: logger.error(f"分组状态修复失败: {str(group_error)}", exc_info=True) # 执行托盘明细状态修复 try: detail_checker = LocationConsistencyChecker( warehouse_code=None, # None表示检查所有仓库 layer=None, # None表示检查所有楼层 auto_fix=True, # 启用自动修复 fix_scope=['details'] # 只修复托盘明细状态 ) detail_checker.check_all() detail_report = detail_checker.generate_report() fixed_details = detail_report['summary']['fixed']['details'] error_details = detail_report['summary']['errors_found']['details'] detail_fix_success = True logger.info(f"托盘明细状态修复完成: 发现{error_details}个问题,修复{fixed_details}个") except Exception as detail_error: logger.error(f"托盘明细状态修复失败: {str(detail_error)}", exc_info=True) # 记录操作日志 if group_fix_success and detail_fix_success: operation_result = "success" operation_content = f"定时一致性修复完成: 分组状态-发现{error_groups}个问题,修复{fixed_groups}个;托盘明细-发现{error_details}个问题,修复{fixed_details}个" elif group_fix_success or detail_fix_success: operation_result = "partial" operation_content = f"定时一致性修复部分完成: " if group_fix_success: operation_content += f"分组状态-发现{error_groups}个问题,修复{fixed_groups}个;" else: operation_content += "分组状态修复失败;" if detail_fix_success: operation_content += f"托盘明细-发现{error_details}个问题,修复{fixed_details}个" else: operation_content += "托盘明细修复失败" else: operation_result = "failure" operation_content = "定时一致性修复失败: 分组状态和托盘明细修复均失败" try: OperationLog.objects.create( operator="系统定时任务", operation_content=operation_content, operation_level="other", operation_result=operation_result, ip_address=None, user_agent="系统定时任务", request_method="CRON", request_path="/backup/consistency_fix", module_name="数据一致性修复" ) except Exception as log_error: logger.error(f"定时一致性修复日志记录失败: {str(log_error)}") except ImportError as e: logger.warning(f"一致性修复模块未找到,跳过修复: {str(e)}") try: OperationLog.objects.create( operator="系统定时任务", operation_content=f"定时一致性修复失败: 模块未找到 - {str(e)}", operation_level="other", operation_result="failure", ip_address=None, user_agent="系统定时任务", request_method="CRON", request_path="/backup/consistency_fix", module_name="数据一致性修复" ) except Exception as log_error: logger.error(f"定时一致性修复失败日志记录失败: {str(log_error)}") except Exception as e: logger.error(f"定时一致性修复失败: {str(e)}", exc_info=True) # 记录失败日志 try: OperationLog.objects.create( operator="系统定时任务", operation_content=f"定时一致性修复失败: {str(e)}", operation_level="other", operation_result="failure", ip_address=None, user_agent="系统定时任务", request_method="CRON", request_path="/backup/consistency_fix", module_name="数据一致性修复" ) except Exception as log_error: logger.error(f"定时一致性修复失败日志记录失败: {str(log_error)}") # 启动定时备份(每6小时执行一次) if not scheduler.running: scheduler.add_job( scheduled_backup, 'cron', hour='*/6', # 每6小时执行一次 minute=0, # 在0分钟时执行 id='db_backup_job' ) # 启动定时一致性修复任务(每6小时执行一次) scheduler.add_job( scheduled_consistency_fix, 'cron', hour='*/6', # 每2小时执行一次 minute=10, # 在10分钟时执行(避免与备份任务冲突) id='consistency_fix_job' ) scheduler.start() logger.info("定时备份任务已启动") logger.info("定时一致性修复任务已启动") def get_backup_files(page=1, page_size=5): """获取备份文件列表(带分页)""" backup_dir = "E:/code/backup/postgres" all_backups = [] # 遍历备份目录 for root, dirs, files in os.walk(backup_dir): for file in files: if file.endswith(".backup"): file_path = os.path.join(root, file) file_size = os.path.getsize(file_path) timestamp = os.path.getmtime(file_path) all_backups.append({ "name": file, "path": file_path, "size": f"{file_size / (1024 * 1024):.2f} MB", "date": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") }) # 按时间倒序排序 all_backups.sort(key=lambda x: x["date"], reverse=True) # 分页处理 total_items = len(all_backups) total_pages = math.ceil(total_items / page_size) start_index = (page - 1) * page_size end_index = min(start_index + page_size, total_items) return { "backups": all_backups[start_index:end_index], "page": page, "page_size": page_size, "total_items": total_items, "total_pages": total_pages } def get_base_backups(page=1, page_size=5): """获取基础备份列表(带分页)""" base_backup_dir = "E:/code/backup/postgres/base_backup" all_backups = [] # 遍历基础备份目录 for dir_name in os.listdir(base_backup_dir): dir_path = os.path.join(base_backup_dir, dir_name) if os.path.isdir(dir_path): timestamp = os.path.getmtime(dir_path) all_backups.append({ "name": dir_name, "path": dir_path, "date": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") }) # 按时间倒序排序 all_backups.sort(key=lambda x: x["date"], reverse=True) # 分页处理 total_items = len(all_backups) total_pages = math.ceil(total_items / page_size) start_index = (page - 1) * page_size end_index = min(start_index + page_size, total_items) return { "backups": all_backups[start_index:end_index], "page": page, "page_size": page_size, "total_items": total_items, "total_pages": total_pages } @csrf_exempt @require_POST def trigger_backup(request): """手动触发备份的API接口""" try: backup_path = perform_base_backup() # 记录成功日志 try: log_success_operation( request=request, operation_content=f"手动触发数据库备份,备份路径: {backup_path}", operation_level="other", module_name="系统备份" ) except Exception as log_error: logger.error(f"备份成功日志记录失败: {str(log_error)}") return JsonResponse({ 'status': 'success', 'message': '数据库备份完成', 'path': backup_path }) except Exception as e: # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"手动触发数据库备份失败: {str(e)}", operation_level="other", module_name="系统备份" ) except Exception as log_error: logger.error(f"备份失败日志记录失败: {str(log_error)}") return JsonResponse({ 'status': 'error', 'message': str(e) }, status=500) @csrf_exempt @require_POST def list_backups(request): """获取备份文件列表API(带分页)""" try: data = json.loads(request.body) backup_type = data.get('type', 'file') page = data.get('page', 1) page_size = data.get('page_size', 5) if backup_type == 'file': result = get_backup_files(page, page_size) elif backup_type == 'base': result = get_base_backups(page, page_size) else: return JsonResponse({ 'status': 'error', 'message': '无效的备份类型' }, status=400) return JsonResponse({ 'status': 'success', 'data': result }) except Exception as e: logger.error(f"获取备份列表失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': str(e) }, status=500) @csrf_exempt @require_POST def restore_to_point(request): """执行时间点恢复API""" try: data = json.loads(request.body) base_backup = data.get('base_backup') if not base_backup or not os.path.exists(base_backup): # 记录失败日志(无效路径) try: log_failure_operation( request=request, operation_content=f"执行数据库恢复失败: 无效的基础备份路径 - {base_backup}", operation_level="other", module_name="系统备份" ) except Exception as log_error: logger.error(f"恢复失败日志记录失败: {str(log_error)}") return JsonResponse({ 'status': 'error', 'message': '无效的基础备份路径' }, status=400) # 暂停定时备份任务 scheduler.pause_job('db_backup_job') logger.info("定时备份任务已暂停") # 执行时间点恢复 restore_to_base_backup( base_backup) # 恢复定时备份任务 scheduler.resume_job('db_backup_job') logger.info("定时备份任务已恢复") # 记录成功日志 try: log_success_operation( request=request, operation_content=f"执行数据库时间点恢复,恢复路径: {base_backup}", operation_level="other", module_name="系统备份" ) except Exception as log_error: logger.error(f"恢复成功日志记录失败: {str(log_error)}") return JsonResponse({ 'status': 'success', 'message': f'已成功恢复到{base_backup}' }) except Exception as e: logger.error(f"时间点恢复失败: {str(e)}") # 记录失败日志 try: log_failure_operation( request=request, operation_content=f"执行数据库时间点恢复失败: {str(e)}", operation_level="other", module_name="系统备份" ) except Exception as log_error: logger.error(f"恢复失败日志记录失败: {str(log_error)}") # 确保恢复定时备份任务 if scheduler.get_job('db_backup_job') and scheduler.get_job('db_backup_job').next_run_time is None: scheduler.resume_job('db_backup_job') logger.info("恢复失败后定时备份任务已恢复") return JsonResponse({ 'status': 'error', 'message': str(e) }, status=500) def _shutdown_system(delay=5): """延迟关闭系统""" def shutdown(): try: if scheduler.running: scheduler.shutdown(wait=False) logger.info("定时任务调度器已停止") except Exception as e: logger.error(f"停止调度器失败: {str(e)}") os._exit(0) # 使用线程延迟执行关闭 timer = threading.Timer(delay, shutdown) timer.daemon = True timer.start() @csrf_exempt @require_POST def shutdown_system(request): """远程关闭系统接口""" try: data = json.loads(request.body) if request.body else {} delay = int(data.get('delay', 5)) # 默认5秒延迟 # 验证延迟时间范围(1-60秒) if delay < 1 or delay > 60: delay = 5 _shutdown_system(delay=delay) return JsonResponse({ 'status': 'success', 'message': f' {delay} ', 'delay': delay }) except Exception as e: logger.error(f"处理系统关闭请求失败: {str(e)}", exc_info=True) return JsonResponse({ 'status': 'error', 'message': f'{str(e)}' }, status=500) @csrf_exempt @require_POST def shutdown_computer(request): try: data = json.loads(request.body) if request.body else {} delay = int(data.get('delay', 60)) if delay < 0 or delay > 600: delay = 60 system = platform.system() try: if system == "Windows": subprocess.run(["shutdown", "/s", "/t", str(delay), "/f"], check=True) elif system == "Linux": subprocess.run(["shutdown", "-h", "+" + str(delay // 60) if delay >= 60 else "now"], check=True) elif system == "Darwin": subprocess.run(["sudo", "shutdown", "-h", "+" + str(delay // 60) if delay >= 60 else "now"], check=True) else: return JsonResponse({ 'status': 'error', 'message': f'不支持的操作系统: {system}' }, status=400) return JsonResponse({ 'status': 'success', 'message': f' {delay} 秒', 'delay': delay, 'system': system }) except subprocess.CalledProcessError as e: logger.error(f"执行失败: {str(e)}") return JsonResponse({ 'status': 'error', 'message': f'{str(e)}' }, status=500) except PermissionError: logger.error("关闭失败: 权限不足") return JsonResponse({ 'status': 'error', 'message': '权限不足' }, status=403) except ValueError: return JsonResponse({ 'status': 'error', 'message': '无效的延迟时间参数' }, status=400) except Exception as e: logger.error(f"关闭失败: {str(e)}", exc_info=True) return JsonResponse({ 'status': 'error', 'message': f'处理关闭请求失败: {str(e)}' }, status=500)