""" Server-Sent Events (SSE) 视图 用于实时推送 ERP 任务数变化 """ from django.http import StreamingHttpResponse from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from .models import InboundBill, OutboundBill import json import time import threading from django.db.models import Q import logging logger = logging.getLogger(__name__) # 全局连接管理器 connections = {} connection_lock = threading.Lock() def get_erp_task_counts(): """获取 ERP 任务数""" inbound_count = InboundBill.objects.filter( bound_status=0, is_delete=False ).count() outbound_count = OutboundBill.objects.filter( bound_status=0, is_delete=False ).count() return { 'inbound_tasks': inbound_count, 'outbound_tasks': outbound_count } @csrf_exempt @require_http_methods(["GET"]) def erp_task_stream(request): """ SSE 流式响应,推送 ERP 任务数变化 客户端通过 EventSource 连接此端点 注意:EventSource 会自动发送 cookies,所以认证应该可以正常工作 """ def event_stream(): # 生成客户端 ID user_id = getattr(request.user, 'id', None) if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous' client_id = f"{user_id}_{int(time.time())}" logger.info(f"SSE 连接建立: {client_id}") # 注册连接 with connection_lock: connections[client_id] = True try: # 发送初始数据 initial_data = get_erp_task_counts() yield f"data: {json.dumps(initial_data)}\n\n" # 上次的任务数 last_inbound_count = initial_data['inbound_tasks'] last_outbound_count = initial_data['outbound_tasks'] # 检查间隔(秒) check_interval = 3 # 心跳间隔(秒) heartbeat_interval = 30 last_heartbeat = time.time() while True: # 检查连接是否仍然活跃 if client_id not in connections: logger.info(f"SSE 连接已移除: {client_id}") break current_time = time.time() try: # 获取当前任务数 current_counts = get_erp_task_counts() current_inbound = current_counts['inbound_tasks'] current_outbound = current_counts['outbound_tasks'] # 如果任务数发生变化,发送更新 if (current_inbound != last_inbound_count or current_outbound != last_outbound_count): logger.debug(f"任务数更新: 入库={current_inbound}, 出库={current_outbound}") yield f"data: {json.dumps(current_counts)}\n\n" last_inbound_count = current_inbound last_outbound_count = current_outbound # 发送心跳(保持连接活跃) # 使用事件格式而不是注释,这样前端可以看到心跳 if current_time - last_heartbeat >= heartbeat_interval: heartbeat_data = { 'type': 'heartbeat', 'timestamp': int(current_time), 'message': '连接正常' } yield f"event: heartbeat\ndata: {json.dumps(heartbeat_data)}\n\n" logger.debug(f"发送心跳: {client_id}") last_heartbeat = current_time time.sleep(check_interval) except Exception as e: logger.error(f"SSE 流处理错误: {str(e)}") # 发生错误时发送错误信息 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" time.sleep(check_interval) except GeneratorExit: # 客户端断开连接 logger.info(f"SSE 客户端断开: {client_id}") except Exception as e: logger.error(f"SSE 流异常: {str(e)}") finally: # 清理连接 with connection_lock: connections.pop(client_id, None) logger.info(f"SSE 连接清理完成: {client_id}") response = StreamingHttpResponse( event_stream(), content_type='text/event-stream' ) response['Cache-Control'] = 'no-cache' response['X-Accel-Buffering'] = 'no' # 禁用 nginx 缓冲 # 注意:不要设置 Connection 头,它是 hop-by-hop 头,由服务器自动处理 # HTTP/1.1 默认就是 keep-alive return response