Преглед изворни кода

Timer bootstep must start before Hub

Ask Solem пре 11 година
родитељ
комит
a908acce5e
1 измењених фајлова са 25 додато и 25 уклоњено
  1. 25 25
      celery/worker/components.py

+ 25 - 25
celery/worker/components.py

@@ -30,7 +30,32 @@ class Object(object):  # XXX
     pass
 
 
+class Timer(bootsteps.Step):
+    """This step initializes the internal timer used by the worker."""
+
+    def create(self, w):
+        if w.use_eventloop:
+            # does not use dedicated timer thread.
+            w.timer = Schedule(max_interval=10.0)
+        else:
+            if not w.timer_cls:
+                # Default Timer is set by the pool, as e.g. eventlet
+                # needs a custom implementation.
+                w.timer_cls = w.pool_cls.Timer
+            w.timer = self.instantiate(w.timer_cls,
+                                       max_interval=w.timer_precision,
+                                       on_timer_error=self.on_timer_error,
+                                       on_timer_tick=self.on_timer_tick)
+
+    def on_timer_error(self, exc):
+        logger.error('Timer error: %r', exc, exc_info=True)
+
+    def on_timer_tick(self, delay):
+        logger.debug('Timer wake-up! Next eta %s secs.', delay)
+
+
 class Hub(bootsteps.StartStopStep):
+    requires = (Timer, )
 
     def __init__(self, w, **kwargs):
         w.hub = None
@@ -39,7 +64,6 @@ class Hub(bootsteps.StartStopStep):
         return w.use_eventloop
 
     def create(self, w):
-        w.timer = Schedule(max_interval=10)
         w.hub = hub.Hub(w.timer)
         return w.hub
 
@@ -151,30 +175,6 @@ class Beat(bootsteps.StartStopStep):
         return b
 
 
-class Timer(bootsteps.Step):
-    """This step initializes the internal timer used by the worker."""
-    requires = (Pool, )
-
-    def include_if(self, w):
-        return not w.use_eventloop
-
-    def create(self, w):
-        if not w.timer_cls:
-            # Default Timer is set by the pool, as e.g. eventlet
-            # needs a custom implementation.
-            w.timer_cls = w.pool.Timer
-        w.timer = self.instantiate(w.pool.Timer,
-                                   max_interval=w.timer_precision,
-                                   on_timer_error=self.on_timer_error,
-                                   on_timer_tick=self.on_timer_tick)
-
-    def on_timer_error(self, exc):
-        logger.error('Timer error: %r', exc, exc_info=True)
-
-    def on_timer_tick(self, delay):
-        logger.debug('Timer wake-up! Next eta %s secs.', delay)
-
-
 class StateDB(bootsteps.Step):
     """This bootstep sets up the workers state db if enabled."""