|
|
@@ -0,0 +1,217 @@
|
|
|
+"""
|
|
|
+Server-Sent Events (SSE) 视图
|
|
|
+用于实时推送 ERP 任务数变化
|
|
|
+"""
|
|
|
+from django.http import StreamingHttpResponse
|
|
|
+from django.views.decorators.http import require_http_methods
|
|
|
+from django.views.decorators.csrf import csrf_exempt
|
|
|
+from django.utils.decorators import method_decorator
|
|
|
+from django.core.cache import cache
|
|
|
+from .models import InboundBill, OutboundBill
|
|
|
+import json
|
|
|
+import time
|
|
|
+import threading
|
|
|
+from django.db.models import Q
|
|
|
+import logging
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# 全局连接管理器
|
|
|
+connections = {}
|
|
|
+connection_lock = threading.Lock()
|
|
|
+
|
|
|
+# 最大连接数限制(防止资源耗尽)
|
|
|
+MAX_CONNECTIONS = 100
|
|
|
+
|
|
|
+# 连接超时时间(秒)- 超过此时间未活跃的连接将被清理
|
|
|
+CONNECTION_TIMEOUT = 300 # 5分钟
|
|
|
+
|
|
|
+# 缓存键名
|
|
|
+CACHE_KEY_TASK_COUNTS = 'erp_task_counts'
|
|
|
+CACHE_TIMEOUT = 300 # 缓存5分钟,由信号处理自动清除
|
|
|
+
|
|
|
+
|
|
|
+def cleanup_stale_connections():
|
|
|
+ """
|
|
|
+ 清理超时的连接(定期调用)
|
|
|
+ 可以放在后台任务中定期执行
|
|
|
+ """
|
|
|
+ current_time = time.time()
|
|
|
+ cleaned = []
|
|
|
+
|
|
|
+ with connection_lock:
|
|
|
+ to_remove = []
|
|
|
+ for client_id, info in connections.items():
|
|
|
+ if current_time - info['last_active'] > CONNECTION_TIMEOUT:
|
|
|
+ to_remove.append(client_id)
|
|
|
+ cleaned.append(client_id)
|
|
|
+
|
|
|
+ for client_id in to_remove:
|
|
|
+ connections.pop(client_id, None)
|
|
|
+
|
|
|
+ if cleaned:
|
|
|
+ logger.info(f"清理了 {len(cleaned)} 个超时连接")
|
|
|
+
|
|
|
+ return len(cleaned)
|
|
|
+
|
|
|
+
|
|
|
+def get_erp_task_counts():
|
|
|
+ """
|
|
|
+ 获取 ERP 任务数(带缓存优化)
|
|
|
+ 使用缓存避免每个连接都查询数据库
|
|
|
+ """
|
|
|
+ # 尝试从缓存获取
|
|
|
+ cached_data = cache.get(CACHE_KEY_TASK_COUNTS)
|
|
|
+ if cached_data is not None:
|
|
|
+ return cached_data
|
|
|
+
|
|
|
+ # 缓存未命中,查询数据库
|
|
|
+ inbound_count = InboundBill.objects.filter(
|
|
|
+ bound_status=0,
|
|
|
+ is_delete=False
|
|
|
+ ).count()
|
|
|
+
|
|
|
+ outbound_count = OutboundBill.objects.filter(
|
|
|
+ bound_status=0,
|
|
|
+ is_delete=False
|
|
|
+ ).count()
|
|
|
+
|
|
|
+ result = {
|
|
|
+ 'inbound_tasks': inbound_count,
|
|
|
+ 'outbound_tasks': outbound_count
|
|
|
+ }
|
|
|
+
|
|
|
+ # 存入缓存(所有连接共享)
|
|
|
+ cache.set(CACHE_KEY_TASK_COUNTS, result, CACHE_TIMEOUT)
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+@csrf_exempt
|
|
|
+@require_http_methods(["GET"])
|
|
|
+def erp_task_stream(request):
|
|
|
+ """
|
|
|
+ SSE 流式响应,推送 ERP 任务数变化
|
|
|
+ 客户端通过 EventSource 连接此端点
|
|
|
+
|
|
|
+ 注意:EventSource 会自动发送 cookies,所以认证应该可以正常工作
|
|
|
+ 重要:在生成器外部获取用户信息,避免在流处理期间访问会话
|
|
|
+ """
|
|
|
+ # 在生成器外部获取用户信息,避免在流处理期间访问会话导致会话解码错误
|
|
|
+ try:
|
|
|
+ if hasattr(request, 'user') and request.user.is_authenticated:
|
|
|
+ user_id = getattr(request.user, 'id', None)
|
|
|
+ else:
|
|
|
+ user_id = 'anonymous'
|
|
|
+ except Exception as e:
|
|
|
+ # 如果访问用户信息时出错(可能因为会话损坏),使用匿名用户
|
|
|
+ logger.warning(f"获取用户信息失败,使用匿名用户: {str(e)}")
|
|
|
+ user_id = 'anonymous'
|
|
|
+
|
|
|
+ client_id = f"{user_id}_{int(time.time())}"
|
|
|
+ logger.info(f"SSE 连接建立: {client_id}")
|
|
|
+
|
|
|
+ def event_stream():
|
|
|
+ """
|
|
|
+ 流式响应生成器
|
|
|
+ 注意:在生成器内部不要访问 request.session 或 request.user
|
|
|
+ 这些信息已在生成器外部获取,避免会话解码错误
|
|
|
+ """
|
|
|
+ # 注册连接(检查连接数限制)
|
|
|
+ with connection_lock:
|
|
|
+ if len(connections) >= MAX_CONNECTIONS:
|
|
|
+ logger.warning(f"SSE 连接数已达上限 ({MAX_CONNECTIONS}),拒绝新连接: {client_id}")
|
|
|
+ yield f"event: error\ndata: {json.dumps({'error': '连接数已达上限,请稍后重试'})}\n\n"
|
|
|
+ return
|
|
|
+ connections[client_id] = {
|
|
|
+ 'created_at': time.time(),
|
|
|
+ 'last_active': time.time()
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 发送初始数据
|
|
|
+ initial_data = get_erp_task_counts()
|
|
|
+ yield f"data: {json.dumps(initial_data)}\n\n"
|
|
|
+
|
|
|
+ # 上次的任务数
|
|
|
+ last_inbound_count = initial_data['inbound_tasks']
|
|
|
+ last_outbound_count = initial_data['outbound_tasks']
|
|
|
+
|
|
|
+ # 检查间隔(秒)- 事件驱动:任务创建时会清除缓存,这里只需定期检查缓存
|
|
|
+ # 由于缓存会被信号自动清除,检查间隔可以更长
|
|
|
+ check_interval = 30
|
|
|
+
|
|
|
+ # 心跳间隔(秒)
|
|
|
+ heartbeat_interval = 30
|
|
|
+ last_heartbeat = time.time()
|
|
|
+
|
|
|
+ while True:
|
|
|
+ # 检查连接是否仍然活跃
|
|
|
+ if client_id not in connections:
|
|
|
+ logger.info(f"SSE 连接已移除: {client_id}")
|
|
|
+ break
|
|
|
+
|
|
|
+ current_time = time.time()
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 更新连接活跃时间
|
|
|
+ with connection_lock:
|
|
|
+ if client_id in connections:
|
|
|
+ connections[client_id]['last_active'] = current_time
|
|
|
+
|
|
|
+ # 获取当前任务数(使用共享缓存,减少数据库查询)
|
|
|
+ current_counts = get_erp_task_counts()
|
|
|
+ current_inbound = current_counts['inbound_tasks']
|
|
|
+ current_outbound = current_counts['outbound_tasks']
|
|
|
+
|
|
|
+ # 如果任务数发生变化,发送更新
|
|
|
+ if (current_inbound != last_inbound_count or
|
|
|
+ current_outbound != last_outbound_count):
|
|
|
+
|
|
|
+ logger.debug(f"任务数更新: 入库={current_inbound}, 出库={current_outbound}")
|
|
|
+ yield f"data: {json.dumps(current_counts)}\n\n"
|
|
|
+ last_inbound_count = current_inbound
|
|
|
+ last_outbound_count = current_outbound
|
|
|
+
|
|
|
+ # 发送心跳(保持连接活跃)
|
|
|
+ # 使用事件格式而不是注释,这样前端可以看到心跳
|
|
|
+ if current_time - last_heartbeat >= heartbeat_interval:
|
|
|
+ heartbeat_data = {
|
|
|
+ 'type': 'heartbeat',
|
|
|
+ 'timestamp': int(current_time),
|
|
|
+ 'message': '连接正常'
|
|
|
+ }
|
|
|
+ yield f"event: heartbeat\ndata: {json.dumps(heartbeat_data)}\n\n"
|
|
|
+ logger.debug(f"发送心跳: {client_id}")
|
|
|
+ last_heartbeat = current_time
|
|
|
+
|
|
|
+ time.sleep(check_interval)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"SSE 流处理错误: {str(e)}")
|
|
|
+ # 发生错误时发送错误信息
|
|
|
+ yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
|
|
|
+ time.sleep(check_interval)
|
|
|
+
|
|
|
+ except GeneratorExit:
|
|
|
+ # 客户端断开连接
|
|
|
+ logger.info(f"SSE 客户端断开: {client_id}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"SSE 流异常: {str(e)}")
|
|
|
+ finally:
|
|
|
+ # 清理连接
|
|
|
+ with connection_lock:
|
|
|
+ connections.pop(client_id, None)
|
|
|
+ logger.info(f"SSE 连接清理完成: {client_id}")
|
|
|
+
|
|
|
+ response = StreamingHttpResponse(
|
|
|
+ event_stream(),
|
|
|
+ content_type='text/event-stream'
|
|
|
+ )
|
|
|
+ response['Cache-Control'] = 'no-cache'
|
|
|
+ response['X-Accel-Buffering'] = 'no' # 禁用 nginx 缓冲
|
|
|
+ # 注意:不要设置 Connection 头,它是 hop-by-hop 头,由服务器自动处理
|
|
|
+ # HTTP/1.1 默认就是 keep-alive
|
|
|
+
|
|
|
+ return response
|
|
|
+
|