ソースを参照

Eventloop: Connection errors in timers should propagate

Ask Solem 12 年 前
コミット
9897e9eb34
2 ファイル変更12 行追加3 行削除
  1. 10 2
      celery/worker/hub.py
  2. 2 1
      celery/worker/loops.py

+ 10 - 2
celery/worker/hub.py

@@ -12,8 +12,10 @@ from kombu.utils import cached_property
 from kombu.utils import eventio
 
 from celery.five import items, range
+from celery.utils.log import get_logger
 from celery.utils.timer2 import Schedule
 
+logger = get_logger(__name__)
 READ, WRITE, ERR = eventio.READ, eventio.WRITE, eventio.ERR
 
 
@@ -144,14 +146,20 @@ class Hub(object):
         for callback in self.on_init:
             callback(self)
 
-    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
+    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
+                    propagate=()):
         delay = None
         if self.timer._queue:
             for i in range(max_timers):
                 delay, entry = next(self.scheduler)
                 if entry is None:
                     break
-                self.timer.apply_entry(entry)
+                try:
+                    entry()
+                except propagate:
+                    raise
+                except Exception, exc:
+                    logger.error('Error in timer: %r', exc, exc_info=1)
         return min(max(delay or 0, min_delay), max_delay)
 
     def add(self, fd, callback, flags):

+ 2 - 1
celery/worker/loops.py

@@ -40,6 +40,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
         drain_nowait = connection.drain_nowait
         on_task_callbacks = hub.on_task
         keep_draining = connection.transport.nb_keep_draining
+        errors = connection.connection_errors
 
         if heartbeat and connection.supports_heartbeats:
             hub.timer.apply_interval(
@@ -73,7 +74,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
 
             # fire any ready timers, this also returns
             # the number of seconds until we need to fire timers again.
-            poll_timeout = fire_timers() if scheduled else 1
+            poll_timeout = fire_timers(propagate=errors) if scheduled else 1
 
             # We only update QoS when there is no more messages to read.
             # This groups together qos calls, and makes sure that remote