فهرست منبع

Enable AMQP heartbeat for gevent/eventlet and other thread-based pools. Closes #3338

Ask Solem 8 سال پیش
والد
کامیت
7193f391c3
2فایلهای تغییر یافته به همراه13 افزوده شده و 4 حذف شده
  1. 4 0
      celery/tests/worker/test_worker.py
  2. 9 4
      celery/worker/loops.py

+ 4 - 0
celery/tests/worker/test_worker.py

@@ -337,6 +337,10 @@ class test_Consumer(AppCase):
             def drain_events(self, **kwargs):
                 self.obj.connection = None
 
+            @property
+            def supports_heartbeats(self):
+                return False
+
         c = self.LoopConsumer()
         c.blueprint.state = RUN
         c.connection = Connection()

+ 9 - 4
celery/worker/loops.py

@@ -25,19 +25,23 @@ def _quick_drain(connection, timeout=0.1):
             raise
 
 
+def _enable_amqheartbeats(timer, connection, rate=2.0):
+    tick = connection.heartbeat_check
+    heartbeat = connection.get_heartbeat_interval()  # negotiated
+    if heartbeat and connection.supports_heartbeats:
+        timer.call_repeatedly(heartbeat / rate, tick, rate)
+
+
 def asynloop(obj, connection, consumer, blueprint, hub, qos,
              heartbeat, clock, hbrate=2.0, RUN=RUN):
     """Non-blocking event loop consuming messages until connection is lost,
     or shutdown is requested."""
     update_qos = qos.update
-    hbtick = connection.heartbeat_check
     errors = connection.connection_errors
-    heartbeat = connection.get_heartbeat_interval()  # negotiated
 
     on_task_received = obj.create_task_handler()
 
-    if heartbeat and connection.supports_heartbeats:
-        hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
+    _enable_amqheartbeats(hub, connection, rate=hbrate)
 
     consumer.on_message = on_task_received
     consumer.consume()
@@ -98,6 +102,7 @@ def synloop(obj, connection, consumer, blueprint, hub, qos,
     """Fallback blocking event loop for transports that doesn't support AIO."""
 
     on_task_received = obj.create_task_handler()
+    #_enable_amqheartbeats(obj.timer, connection, rate=hbrate)
     perform_pending_operations = obj.perform_pending_operations
     consumer.on_message = on_task_received
     consumer.consume()