""" Server-Sent Events (SSE) 视图 用于实时推送 ERP 任务数变化 """ from django.http import StreamingHttpResponse from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.core.cache import cache from .models import InboundBill, OutboundBill import json import time import threading from django.db.models import Q import logging logger = logging.getLogger(__name__) # 全局连接管理器 connections = {} connection_lock = threading.Lock() # 最大连接数限制(防止资源耗尽) MAX_CONNECTIONS = 100 # 连接超时时间(秒)- 超过此时间未活跃的连接将被清理 CONNECTION_TIMEOUT = 300 # 5分钟 # 缓存键名 CACHE_KEY_TASK_COUNTS = 'erp_task_counts' CACHE_TIMEOUT = 300 # 缓存5分钟,由信号处理自动清除 def cleanup_stale_connections(): """ 清理超时的连接(定期调用) 可以放在后台任务中定期执行 """ current_time = time.time() cleaned = [] with connection_lock: to_remove = [] for client_id, info in connections.items(): if current_time - info['last_active'] > CONNECTION_TIMEOUT: to_remove.append(client_id) cleaned.append(client_id) for client_id in to_remove: connections.pop(client_id, None) if cleaned: logger.info(f"清理了 {len(cleaned)} 个超时连接") return len(cleaned) def get_erp_task_counts(): """ 获取 ERP 任务数(带缓存优化) 使用缓存避免每个连接都查询数据库 """ # 尝试从缓存获取 cached_data = cache.get(CACHE_KEY_TASK_COUNTS) if cached_data is not None: return cached_data # 缓存未命中,查询数据库 inbound_count = InboundBill.objects.filter( bound_status=0, is_delete=False ).count() outbound_count = OutboundBill.objects.filter( bound_status=0, is_delete=False ).count() result = { 'inbound_tasks': inbound_count, 'outbound_tasks': outbound_count } # 存入缓存(所有连接共享) cache.set(CACHE_KEY_TASK_COUNTS, result, CACHE_TIMEOUT) return result @csrf_exempt @require_http_methods(["GET"]) def erp_task_stream(request): """ SSE 流式响应,推送 ERP 任务数变化 客户端通过 EventSource 连接此端点 注意:EventSource 会自动发送 cookies,所以认证应该可以正常工作 重要:在生成器外部获取用户信息,避免在流处理期间访问会话 """ # 在生成器外部获取用户信息,避免在流处理期间访问会话导致会话解码错误 try: if hasattr(request, 'user') and request.user.is_authenticated: user_id = getattr(request.user, 'id', None) else: user_id = 'anonymous' except Exception as e: # 如果访问用户信息时出错(可能因为会话损坏),使用匿名用户 logger.warning(f"获取用户信息失败,使用匿名用户: {str(e)}") user_id = 'anonymous' client_id = f"{user_id}_{int(time.time())}" logger.info(f"SSE 连接建立: {client_id}") def event_stream(): """ 流式响应生成器 注意:在生成器内部不要访问 request.session 或 request.user 这些信息已在生成器外部获取,避免会话解码错误 """ # 注册连接(检查连接数限制) with connection_lock: if len(connections) >= MAX_CONNECTIONS: logger.warning(f"SSE 连接数已达上限 ({MAX_CONNECTIONS}),拒绝新连接: {client_id}") yield f"event: error\ndata: {json.dumps({'error': '连接数已达上限,请稍后重试'})}\n\n" return connections[client_id] = { 'created_at': time.time(), 'last_active': time.time() } try: # 发送初始数据 initial_data = get_erp_task_counts() yield f"data: {json.dumps(initial_data)}\n\n" # 上次的任务数 last_inbound_count = initial_data['inbound_tasks'] last_outbound_count = initial_data['outbound_tasks'] # 检查间隔(秒)- 事件驱动:任务创建时会清除缓存,这里只需定期检查缓存 # 由于缓存会被信号自动清除,检查间隔可以更长 check_interval = 30 # 心跳间隔(秒) heartbeat_interval = 30 last_heartbeat = time.time() while True: # 检查连接是否仍然活跃 if client_id not in connections: logger.info(f"SSE 连接已移除: {client_id}") break current_time = time.time() try: # 更新连接活跃时间 with connection_lock: if client_id in connections: connections[client_id]['last_active'] = current_time # 获取当前任务数(使用共享缓存,减少数据库查询) current_counts = get_erp_task_counts() current_inbound = current_counts['inbound_tasks'] current_outbound = current_counts['outbound_tasks'] # 如果任务数发生变化,发送更新 if (current_inbound != last_inbound_count or current_outbound != last_outbound_count): logger.debug(f"任务数更新: 入库={current_inbound}, 出库={current_outbound}") yield f"data: {json.dumps(current_counts)}\n\n" last_inbound_count = current_inbound last_outbound_count = current_outbound # 发送心跳(保持连接活跃) # 使用事件格式而不是注释,这样前端可以看到心跳 if current_time - last_heartbeat >= heartbeat_interval: heartbeat_data = { 'type': 'heartbeat', 'timestamp': int(current_time), 'message': '连接正常' } yield f"event: heartbeat\ndata: {json.dumps(heartbeat_data)}\n\n" logger.debug(f"发送心跳: {client_id}") last_heartbeat = current_time time.sleep(check_interval) except Exception as e: logger.error(f"SSE 流处理错误: {str(e)}") # 发生错误时发送错误信息 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" time.sleep(check_interval) except GeneratorExit: # 客户端断开连接 logger.info(f"SSE 客户端断开: {client_id}") except Exception as e: logger.error(f"SSE 流异常: {str(e)}") finally: # 清理连接 with connection_lock: connections.pop(client_id, None) logger.info(f"SSE 连接清理完成: {client_id}") response = StreamingHttpResponse( event_stream(), content_type='text/event-stream' ) response['Cache-Control'] = 'no-cache' response['X-Accel-Buffering'] = 'no' # 禁用 nginx 缓冲 # 注意:不要设置 Connection 头,它是 hop-by-hop 头,由服务器自动处理 # HTTP/1.1 默认就是 keep-alive return response