sse_views.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. """
  2. Server-Sent Events (SSE) 视图
  3. 用于实时推送 ERP 任务数变化
  4. """
  5. from django.http import StreamingHttpResponse
  6. from django.views.decorators.http import require_http_methods
  7. from django.views.decorators.csrf import csrf_exempt
  8. from django.utils.decorators import method_decorator
  9. from .models import InboundBill, OutboundBill
  10. import json
  11. import time
  12. import threading
  13. from django.db.models import Q
  14. import logging
  15. logger = logging.getLogger(__name__)
  16. # 全局连接管理器
  17. connections = {}
  18. connection_lock = threading.Lock()
  19. def get_erp_task_counts():
  20. """获取 ERP 任务数"""
  21. inbound_count = InboundBill.objects.filter(
  22. bound_status=0,
  23. is_delete=False
  24. ).count()
  25. outbound_count = OutboundBill.objects.filter(
  26. bound_status=0,
  27. is_delete=False
  28. ).count()
  29. return {
  30. 'inbound_tasks': inbound_count,
  31. 'outbound_tasks': outbound_count
  32. }
  33. @csrf_exempt
  34. @require_http_methods(["GET"])
  35. def erp_task_stream(request):
  36. """
  37. SSE 流式响应,推送 ERP 任务数变化
  38. 客户端通过 EventSource 连接此端点
  39. 注意:EventSource 会自动发送 cookies,所以认证应该可以正常工作
  40. """
  41. def event_stream():
  42. # 生成客户端 ID
  43. user_id = getattr(request.user, 'id', None) if hasattr(request, 'user') and request.user.is_authenticated else 'anonymous'
  44. client_id = f"{user_id}_{int(time.time())}"
  45. logger.info(f"SSE 连接建立: {client_id}")
  46. # 注册连接
  47. with connection_lock:
  48. connections[client_id] = True
  49. try:
  50. # 发送初始数据
  51. initial_data = get_erp_task_counts()
  52. yield f"data: {json.dumps(initial_data)}\n\n"
  53. # 上次的任务数
  54. last_inbound_count = initial_data['inbound_tasks']
  55. last_outbound_count = initial_data['outbound_tasks']
  56. # 检查间隔(秒)
  57. check_interval = 3
  58. # 心跳间隔(秒)
  59. heartbeat_interval = 30
  60. last_heartbeat = time.time()
  61. while True:
  62. # 检查连接是否仍然活跃
  63. if client_id not in connections:
  64. logger.info(f"SSE 连接已移除: {client_id}")
  65. break
  66. current_time = time.time()
  67. try:
  68. # 获取当前任务数
  69. current_counts = get_erp_task_counts()
  70. current_inbound = current_counts['inbound_tasks']
  71. current_outbound = current_counts['outbound_tasks']
  72. # 如果任务数发生变化,发送更新
  73. if (current_inbound != last_inbound_count or
  74. current_outbound != last_outbound_count):
  75. logger.debug(f"任务数更新: 入库={current_inbound}, 出库={current_outbound}")
  76. yield f"data: {json.dumps(current_counts)}\n\n"
  77. last_inbound_count = current_inbound
  78. last_outbound_count = current_outbound
  79. # 发送心跳(保持连接活跃)
  80. # 使用事件格式而不是注释,这样前端可以看到心跳
  81. if current_time - last_heartbeat >= heartbeat_interval:
  82. heartbeat_data = {
  83. 'type': 'heartbeat',
  84. 'timestamp': int(current_time),
  85. 'message': '连接正常'
  86. }
  87. yield f"event: heartbeat\ndata: {json.dumps(heartbeat_data)}\n\n"
  88. logger.debug(f"发送心跳: {client_id}")
  89. last_heartbeat = current_time
  90. time.sleep(check_interval)
  91. except Exception as e:
  92. logger.error(f"SSE 流处理错误: {str(e)}")
  93. # 发生错误时发送错误信息
  94. yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
  95. time.sleep(check_interval)
  96. except GeneratorExit:
  97. # 客户端断开连接
  98. logger.info(f"SSE 客户端断开: {client_id}")
  99. except Exception as e:
  100. logger.error(f"SSE 流异常: {str(e)}")
  101. finally:
  102. # 清理连接
  103. with connection_lock:
  104. connections.pop(client_id, None)
  105. logger.info(f"SSE 连接清理完成: {client_id}")
  106. response = StreamingHttpResponse(
  107. event_stream(),
  108. content_type='text/event-stream'
  109. )
  110. response['Cache-Control'] = 'no-cache'
  111. response['X-Accel-Buffering'] = 'no' # 禁用 nginx 缓冲
  112. # 注意:不要设置 Connection 头,它是 hop-by-hop 头,由服务器自动处理
  113. # HTTP/1.1 默认就是 keep-alive
  114. return response