sse_views.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. """
  2. Server-Sent Events (SSE) 视图
  3. 用于实时推送 ERP 任务数变化
  4. """
  5. from django.http import StreamingHttpResponse
  6. from django.views.decorators.http import require_http_methods
  7. from django.views.decorators.csrf import csrf_exempt
  8. from django.utils.decorators import method_decorator
  9. from django.core.cache import cache
  10. from .models import InboundBill, OutboundBill
  11. import json
  12. import time
  13. import threading
  14. from django.db.models import Q
  15. import logging
  16. logger = logging.getLogger(__name__)
  17. # 全局连接管理器
  18. connections = {}
  19. connection_lock = threading.Lock()
  20. # 最大连接数限制(防止资源耗尽)
  21. MAX_CONNECTIONS = 100
  22. # 连接超时时间(秒)- 超过此时间未活跃的连接将被清理
  23. CONNECTION_TIMEOUT = 300 # 5分钟
  24. # 缓存键名
  25. CACHE_KEY_TASK_COUNTS = 'erp_task_counts'
  26. CACHE_TIMEOUT = 300 # 缓存5分钟,由信号处理自动清除
  27. def cleanup_stale_connections():
  28. """
  29. 清理超时的连接(定期调用)
  30. 可以放在后台任务中定期执行
  31. """
  32. current_time = time.time()
  33. cleaned = []
  34. with connection_lock:
  35. to_remove = []
  36. for client_id, info in connections.items():
  37. if current_time - info['last_active'] > CONNECTION_TIMEOUT:
  38. to_remove.append(client_id)
  39. cleaned.append(client_id)
  40. for client_id in to_remove:
  41. connections.pop(client_id, None)
  42. if cleaned:
  43. logger.info(f"清理了 {len(cleaned)} 个超时连接")
  44. return len(cleaned)
  45. def get_erp_task_counts():
  46. """
  47. 获取 ERP 任务数(带缓存优化)
  48. 使用缓存避免每个连接都查询数据库
  49. """
  50. # 尝试从缓存获取
  51. cached_data = cache.get(CACHE_KEY_TASK_COUNTS)
  52. if cached_data is not None:
  53. return cached_data
  54. # 缓存未命中,查询数据库
  55. inbound_count = InboundBill.objects.filter(
  56. bound_status=0,
  57. is_delete=False
  58. ).count()
  59. outbound_count = OutboundBill.objects.filter(
  60. bound_status=0,
  61. is_delete=False
  62. ).count()
  63. result = {
  64. 'inbound_tasks': inbound_count,
  65. 'outbound_tasks': outbound_count
  66. }
  67. # 存入缓存(所有连接共享)
  68. cache.set(CACHE_KEY_TASK_COUNTS, result, CACHE_TIMEOUT)
  69. return result
  70. @csrf_exempt
  71. @require_http_methods(["GET"])
  72. def erp_task_stream(request):
  73. """
  74. SSE 流式响应,推送 ERP 任务数变化
  75. 客户端通过 EventSource 连接此端点
  76. 注意:EventSource 会自动发送 cookies,所以认证应该可以正常工作
  77. 重要:在生成器外部获取用户信息,避免在流处理期间访问会话
  78. """
  79. # 在生成器外部获取用户信息,避免在流处理期间访问会话导致会话解码错误
  80. try:
  81. if hasattr(request, 'user') and request.user.is_authenticated:
  82. user_id = getattr(request.user, 'id', None)
  83. else:
  84. user_id = 'anonymous'
  85. except Exception as e:
  86. # 如果访问用户信息时出错(可能因为会话损坏),使用匿名用户
  87. logger.warning(f"获取用户信息失败,使用匿名用户: {str(e)}")
  88. user_id = 'anonymous'
  89. client_id = f"{user_id}_{int(time.time())}"
  90. logger.info(f"SSE 连接建立: {client_id}")
  91. def event_stream():
  92. """
  93. 流式响应生成器
  94. 注意:在生成器内部不要访问 request.session 或 request.user
  95. 这些信息已在生成器外部获取,避免会话解码错误
  96. """
  97. # 注册连接(检查连接数限制)
  98. with connection_lock:
  99. if len(connections) >= MAX_CONNECTIONS:
  100. logger.warning(f"SSE 连接数已达上限 ({MAX_CONNECTIONS}),拒绝新连接: {client_id}")
  101. yield f"event: error\ndata: {json.dumps({'error': '连接数已达上限,请稍后重试'})}\n\n"
  102. return
  103. connections[client_id] = {
  104. 'created_at': time.time(),
  105. 'last_active': time.time()
  106. }
  107. try:
  108. # 发送初始数据
  109. initial_data = get_erp_task_counts()
  110. yield f"data: {json.dumps(initial_data)}\n\n"
  111. # 上次的任务数
  112. last_inbound_count = initial_data['inbound_tasks']
  113. last_outbound_count = initial_data['outbound_tasks']
  114. # 检查间隔(秒)- 事件驱动:任务创建时会清除缓存,这里只需定期检查缓存
  115. # 由于缓存会被信号自动清除,检查间隔可以更长
  116. check_interval = 30
  117. # 心跳间隔(秒)
  118. heartbeat_interval = 30
  119. last_heartbeat = time.time()
  120. while True:
  121. # 检查连接是否仍然活跃
  122. if client_id not in connections:
  123. logger.info(f"SSE 连接已移除: {client_id}")
  124. break
  125. current_time = time.time()
  126. try:
  127. # 更新连接活跃时间
  128. with connection_lock:
  129. if client_id in connections:
  130. connections[client_id]['last_active'] = current_time
  131. # 获取当前任务数(使用共享缓存,减少数据库查询)
  132. current_counts = get_erp_task_counts()
  133. current_inbound = current_counts['inbound_tasks']
  134. current_outbound = current_counts['outbound_tasks']
  135. # 如果任务数发生变化,发送更新
  136. if (current_inbound != last_inbound_count or
  137. current_outbound != last_outbound_count):
  138. logger.debug(f"任务数更新: 入库={current_inbound}, 出库={current_outbound}")
  139. yield f"data: {json.dumps(current_counts)}\n\n"
  140. last_inbound_count = current_inbound
  141. last_outbound_count = current_outbound
  142. # 发送心跳(保持连接活跃)
  143. # 使用事件格式而不是注释,这样前端可以看到心跳
  144. if current_time - last_heartbeat >= heartbeat_interval:
  145. heartbeat_data = {
  146. 'type': 'heartbeat',
  147. 'timestamp': int(current_time),
  148. 'message': '连接正常'
  149. }
  150. yield f"event: heartbeat\ndata: {json.dumps(heartbeat_data)}\n\n"
  151. logger.debug(f"发送心跳: {client_id}")
  152. last_heartbeat = current_time
  153. time.sleep(check_interval)
  154. except Exception as e:
  155. logger.error(f"SSE 流处理错误: {str(e)}")
  156. # 发生错误时发送错误信息
  157. yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
  158. time.sleep(check_interval)
  159. except GeneratorExit:
  160. # 客户端断开连接
  161. logger.info(f"SSE 客户端断开: {client_id}")
  162. except Exception as e:
  163. logger.error(f"SSE 流异常: {str(e)}")
  164. finally:
  165. # 清理连接
  166. with connection_lock:
  167. connections.pop(client_id, None)
  168. logger.info(f"SSE 连接清理完成: {client_id}")
  169. response = StreamingHttpResponse(
  170. event_stream(),
  171. content_type='text/event-stream'
  172. )
  173. response['Cache-Control'] = 'no-cache'
  174. response['X-Accel-Buffering'] = 'no' # 禁用 nginx 缓冲
  175. # 注意:不要设置 Connection 头,它是 hop-by-hop 头,由服务器自动处理
  176. # HTTP/1.1 默认就是 keep-alive
  177. return response