flower_bs 1 ماه پیش
والد
کامیت
9c88f65210
3فایلهای تغییر یافته به همراه128 افزوده شده و 7 حذف شده
  1. 4 0
      erp/apps.py
  2. 55 0
      erp/signals.py
  3. 69 7
      erp/sse_views.py

+ 4 - 0
erp/apps.py

@@ -4,3 +4,7 @@ from django.apps import AppConfig
 class ErpConfig(AppConfig):
     default_auto_field = 'django.db.models.BigAutoField'
     name = 'erp'
+    
+    def ready(self):
+        """应用启动时导入信号处理器"""
+        import erp.signals  # noqa

+ 55 - 0
erp/signals.py

@@ -0,0 +1,55 @@
+"""
+ERP 模块信号处理
+在任务创建/更新时清除缓存,实现实时推送
+"""
+from django.db.models.signals import post_save, post_delete
+from django.dispatch import receiver
+from django.core.cache import cache
+from .models import InboundBill, OutboundBill
+import logging
+
+logger = logging.getLogger(__name__)
+
+# 缓存键名(与 sse_views.py 中的保持一致)
+CACHE_KEY_TASK_COUNTS = 'erp_task_counts'
+
+
+def invalidate_task_counts_cache():
+    """清除任务数缓存"""
+    cache.delete(CACHE_KEY_TASK_COUNTS)
+    logger.debug("已清除 ERP 任务数缓存")
+
+
+@receiver(post_save, sender=InboundBill)
+def inbound_bill_saved(sender, instance, **kwargs):
+    """入库单保存时清除缓存"""
+    # 只要 bound_status 或 is_delete 字段变化,都可能影响任务数,清除缓存
+    # 任务数统计条件:bound_status=0 且 is_delete=False
+    # 所以任何保存操作都可能影响计数,直接清除缓存
+    invalidate_task_counts_cache()
+    logger.debug(f"入库单 {instance.number} 保存(bound_status={instance.bound_status}, is_delete={instance.is_delete}),已清除任务数缓存")
+
+
+@receiver(post_save, sender=OutboundBill)
+def outbound_bill_saved(sender, instance, **kwargs):
+    """出库单保存时清除缓存"""
+    # 只要 bound_status 或 is_delete 字段变化,都可能影响任务数,清除缓存
+    # 任务数统计条件:bound_status=0 且 is_delete=False
+    # 所以任何保存操作都可能影响计数,直接清除缓存
+    invalidate_task_counts_cache()
+    logger.debug(f"出库单 {instance.number} 保存(bound_status={instance.bound_status}, is_delete={instance.is_delete}),已清除任务数缓存")
+
+
+@receiver(post_delete, sender=InboundBill)
+def inbound_bill_deleted(sender, instance, **kwargs):
+    """入库单删除时清除缓存"""
+    invalidate_task_counts_cache()
+    logger.debug(f"入库单 {instance.number} 删除,已清除任务数缓存")
+
+
+@receiver(post_delete, sender=OutboundBill)
+def outbound_bill_deleted(sender, instance, **kwargs):
+    """出库单删除时清除缓存"""
+    invalidate_task_counts_cache()
+    logger.debug(f"出库单 {instance.number} 删除,已清除任务数缓存")
+

+ 69 - 7
erp/sse_views.py

@@ -6,6 +6,7 @@ from django.http import StreamingHttpResponse
 from django.views.decorators.http import require_http_methods
 from django.views.decorators.csrf import csrf_exempt
 from django.utils.decorators import method_decorator
+from django.core.cache import cache
 from .models import InboundBill, OutboundBill
 import json
 import time
@@ -19,9 +20,52 @@ logger = logging.getLogger(__name__)
 connections = {}
 connection_lock = threading.Lock()
 
+# 最大连接数限制(防止资源耗尽)
+MAX_CONNECTIONS = 100
+
+# 连接超时时间(秒)- 超过此时间未活跃的连接将被清理
+CONNECTION_TIMEOUT = 300  # 5分钟
+
+# 缓存键名
+CACHE_KEY_TASK_COUNTS = 'erp_task_counts'
+CACHE_TIMEOUT = 300  # 缓存5分钟,由信号处理自动清除
+
+
+def cleanup_stale_connections():
+    """
+    清理超时的连接(定期调用)
+    可以放在后台任务中定期执行
+    """
+    current_time = time.time()
+    cleaned = []
+    
+    with connection_lock:
+        to_remove = []
+        for client_id, info in connections.items():
+            if current_time - info['last_active'] > CONNECTION_TIMEOUT:
+                to_remove.append(client_id)
+                cleaned.append(client_id)
+        
+        for client_id in to_remove:
+            connections.pop(client_id, None)
+    
+    if cleaned:
+        logger.info(f"清理了 {len(cleaned)} 个超时连接")
+    
+    return len(cleaned)
+
 
 def get_erp_task_counts():
-    """获取 ERP 任务数"""
+    """
+    获取 ERP 任务数(带缓存优化)
+    使用缓存避免每个连接都查询数据库
+    """
+    # 尝试从缓存获取
+    cached_data = cache.get(CACHE_KEY_TASK_COUNTS)
+    if cached_data is not None:
+        return cached_data
+    
+    # 缓存未命中,查询数据库
     inbound_count = InboundBill.objects.filter(
         bound_status=0,
         is_delete=False
@@ -32,10 +76,15 @@ def get_erp_task_counts():
         is_delete=False
     ).count()
     
-    return {
+    result = {
         'inbound_tasks': inbound_count,
         'outbound_tasks': outbound_count
     }
+    
+    # 存入缓存(所有连接共享)
+    cache.set(CACHE_KEY_TASK_COUNTS, result, CACHE_TIMEOUT)
+    
+    return result
 
 
 @csrf_exempt
@@ -54,9 +103,16 @@ def erp_task_stream(request):
         
         logger.info(f"SSE 连接建立: {client_id}")
         
-        # 注册连接
+        # 注册连接(检查连接数限制)
         with connection_lock:
-            connections[client_id] = True
+            if len(connections) >= MAX_CONNECTIONS:
+                logger.warning(f"SSE 连接数已达上限 ({MAX_CONNECTIONS}),拒绝新连接: {client_id}")
+                yield f"event: error\ndata: {json.dumps({'error': '连接数已达上限,请稍后重试'})}\n\n"
+                return
+            connections[client_id] = {
+                'created_at': time.time(),
+                'last_active': time.time()
+            }
         
         try:
             # 发送初始数据
@@ -67,8 +123,9 @@ def erp_task_stream(request):
             last_inbound_count = initial_data['inbound_tasks']
             last_outbound_count = initial_data['outbound_tasks']
             
-            # 检查间隔(秒)
-            check_interval = 3
+            # 检查间隔(秒)- 事件驱动:任务创建时会清除缓存,这里只需定期检查缓存
+            # 由于缓存会被信号自动清除,检查间隔可以更长
+            check_interval = 30
             
             # 心跳间隔(秒)
             heartbeat_interval = 30
@@ -83,7 +140,12 @@ def erp_task_stream(request):
                 current_time = time.time()
                 
                 try:
-                    # 获取当前任务数
+                    # 更新连接活跃时间
+                    with connection_lock:
+                        if client_id in connections:
+                            connections[client_id]['last_active'] = current_time
+                    
+                    # 获取当前任务数(使用共享缓存,减少数据库查询)
                     current_counts = get_erp_task_counts()
                     current_inbound = current_counts['inbound_tasks']
                     current_outbound = current_counts['outbound_tasks']