| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- """
- Server-Sent Events (SSE) 视图
- 用于实时推送 ERP 任务数变化
- """
- from django.http import StreamingHttpResponse
- from django.views.decorators.http import require_http_methods
- from django.views.decorators.csrf import csrf_exempt
- from django.utils.decorators import method_decorator
- from .models import InboundBill, OutboundBill
- import json
- import time
- import threading
- from django.db.models import Q
- import logging
- logger = logging.getLogger(__name__)
- # 全局连接管理器
- connections = {}
- connection_lock = threading.Lock()
- def get_erp_task_counts():
- """获取 ERP 任务数"""
- inbound_count = InboundBill.objects.filter(
- bound_status=0,
- is_delete=False
- ).count()
-
- outbound_count = OutboundBill.objects.filter(
- bound_status=0,
- is_delete=False
- ).count()
-
- return {
- 'inbound_tasks': inbound_count,
- 'outbound_tasks': outbound_count
- }
- @csrf_exempt
- @require_http_methods(["GET"])
- def erp_task_stream(request):
- """
- SSE 流式响应,推送 ERP 任务数变化
- 客户端通过 EventSource 连接此端点
-
- 注意:EventSource 会自动发送 cookies,所以认证应该可以正常工作
- """
- def event_stream():
- # 生成客户端 ID
- user_id = getattr(request.user, 'id', None) if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous'
- client_id = f"{user_id}_{int(time.time())}"
-
- logger.info(f"SSE 连接建立: {client_id}")
-
- # 注册连接
- with connection_lock:
- connections[client_id] = True
-
- try:
- # 发送初始数据
- initial_data = get_erp_task_counts()
- yield f"data: {json.dumps(initial_data)}\n\n"
-
- # 上次的任务数
- last_inbound_count = initial_data['inbound_tasks']
- last_outbound_count = initial_data['outbound_tasks']
-
- # 检查间隔(秒)
- check_interval = 3
-
- # 心跳间隔(秒)
- heartbeat_interval = 30
- last_heartbeat = time.time()
-
- while True:
- # 检查连接是否仍然活跃
- if client_id not in connections:
- logger.info(f"SSE 连接已移除: {client_id}")
- break
-
- current_time = time.time()
-
- try:
- # 获取当前任务数
- current_counts = get_erp_task_counts()
- current_inbound = current_counts['inbound_tasks']
- current_outbound = current_counts['outbound_tasks']
-
- # 如果任务数发生变化,发送更新
- if (current_inbound != last_inbound_count or
- current_outbound != last_outbound_count):
-
- logger.debug(f"任务数更新: 入库={current_inbound}, 出库={current_outbound}")
- yield f"data: {json.dumps(current_counts)}\n\n"
- last_inbound_count = current_inbound
- last_outbound_count = current_outbound
-
- # 发送心跳(保持连接活跃)
- # 使用事件格式而不是注释,这样前端可以看到心跳
- if current_time - last_heartbeat >= heartbeat_interval:
- heartbeat_data = {
- 'type': 'heartbeat',
- 'timestamp': int(current_time),
- 'message': '连接正常'
- }
- yield f"event: heartbeat\ndata: {json.dumps(heartbeat_data)}\n\n"
- logger.debug(f"发送心跳: {client_id}")
- last_heartbeat = current_time
-
- time.sleep(check_interval)
-
- except Exception as e:
- logger.error(f"SSE 流处理错误: {str(e)}")
- # 发生错误时发送错误信息
- yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
- time.sleep(check_interval)
-
- except GeneratorExit:
- # 客户端断开连接
- logger.info(f"SSE 客户端断开: {client_id}")
- except Exception as e:
- logger.error(f"SSE 流异常: {str(e)}")
- finally:
- # 清理连接
- with connection_lock:
- connections.pop(client_id, None)
- logger.info(f"SSE 连接清理完成: {client_id}")
-
- response = StreamingHttpResponse(
- event_stream(),
- content_type='text/event-stream'
- )
- response['Cache-Control'] = 'no-cache'
- response['X-Accel-Buffering'] = 'no' # 禁用 nginx 缓冲
- # 注意:不要设置 Connection 头,它是 hop-by-hop 头,由服务器自动处理
- # HTTP/1.1 默认就是 keep-alive
-
- return response
|