Browse Source

Eventloop: Connection errors in timers should propagate

Ask Solem 12 years ago
parent
commit
35a2d8a041
2 changed files with 14 additions and 6 deletions
  1. 2 2
      celery/worker/consumer.py
  2. 12 4
      celery/worker/hub.py

+ 2 - 2
celery/worker/consumer.py

@@ -419,6 +419,7 @@ class Consumer(object):
             drain_nowait = connection.drain_nowait
             on_task_callbacks = hub.on_task
             keep_draining = connection.transport.nb_keep_draining
+            errors = connection.connection_errors
 
             if hb and connection.supports_heartbeats:
                 hub.timer.apply_interval(
@@ -437,7 +438,6 @@ class Consumer(object):
                     self.handle_unknown_task(body, message, exc)
                 except InvalidTaskError, exc:
                     self.handle_invalid_task(body, message, exc)
-                #fire_timers()
 
             self.task_consumer.callbacks = [on_task_received]
             self.task_consumer.consume()
@@ -453,7 +453,7 @@ class Consumer(object):
 
                 # fire any ready timers, this also returns
                 # the number of seconds until we need to fire timers again.
-                poll_timeout = fire_timers() if scheduled else 1
+                poll_timeout = fire_timers(propagate=errors) if scheduled else 1
 
                 # We only update QoS when there is no more messages to read.
                 # This groups together qos calls, and makes sure that remote

+ 12 - 4
celery/worker/hub.py

@@ -11,8 +11,10 @@ from __future__ import absolute_import
 from kombu.utils import cached_property
 from kombu.utils import eventio
 
+from celery.utils.log import get_logger
 from celery.utils.timer2 import Schedule
 
+logger = get_logger(__name__)
 READ, WRITE, ERR = eventio.READ, eventio.WRITE, eventio.ERR
 
 
@@ -143,15 +145,21 @@ class Hub(object):
         for callback in self.on_init:
             callback(self)
 
-    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
+    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
+                    propagate=()):
         delay = None
         if self.timer._queue:
-            for i in xrange(max_timers):
+            for i in range(max_timers):
                 delay, entry = self.scheduler.next()
                 if entry is None:
                     break
-                self.timer.apply_entry(entry)
-        return min(max(delay, min_delay), max_delay)
+                try:
+                    entry()
+                except propagate:
+                    raise
+                except Exception, exc:
+                    logger.error('Error in timer: %r', exc, exc_info=1)
+        return min(max(delay or 0, min_delay), max_delay)
 
     def add(self, fd, callback, flags):
         self.poller.register(fd, flags)