瀏覽代碼

Worker: Changes order of startup so that Mingle -> Control -> Gossip. Issue #1686

Ask Solem 11 年之前
父節點
當前提交
fa939b6623
共有 1 個文件被更改,包括 49 次插入48 次删除
  1. 49 48
      celery/worker/consumer.py

+ 49 - 48
celery/worker/consumer.py

@@ -536,20 +536,6 @@ class Heart(bootsteps.StartStopStep):
     shutdown = stop
 
 
-class Control(bootsteps.StartStopStep):
-    requires = (Events, )
-
-    def __init__(self, c, **kwargs):
-        self.is_green = c.pool is not None and c.pool.is_green
-        self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
-        self.start = self.box.start
-        self.stop = self.box.stop
-        self.shutdown = self.box.shutdown
-
-    def include_if(self, c):
-        return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL
-
-
 class Tasks(bootsteps.StartStopStep):
     requires = (Events, )
 
@@ -592,9 +578,57 @@ class Agent(bootsteps.StartStopStep):
         return agent
 
 
+class Mingle(bootsteps.StartStopStep):
+    label = 'Mingle'
+    requires = (Events, )
+    compatible_transports = set(['amqp', 'redis'])
+
+    def __init__(self, c, without_mingle=False, **kwargs):
+        self.enabled = not without_mingle and self.compatible_transport(c.app)
+
+    def compatible_transport(self, app):
+        with app.connection() as conn:
+            return conn.transport.driver_type in self.compatible_transports
+
+    def start(self, c):
+        info('mingle: searching for neighbors')
+        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
+        replies = I.hello(c.hostname, revoked._data) or {}
+        replies.pop(c.hostname, None)
+        if replies:
+            info('mingle: sync with %s nodes',
+                 len([reply for reply, value in items(replies) if value]))
+            for reply in values(replies):
+                if reply:
+                    try:
+                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
+                    except KeyError:  # reply from pre-3.1 worker
+                        pass
+                    else:
+                        c.app.clock.adjust(other_clock)
+                        revoked.update(other_revoked)
+            info('mingle: sync complete')
+        else:
+            info('mingle: all alone')
+
+
+class Control(bootsteps.StartStopStep):
+    requires = (Mingle, )
+
+    def __init__(self, c, **kwargs):
+        self.is_green = c.pool is not None and c.pool.is_green
+        self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
+        self.start = self.box.start
+        self.stop = self.box.stop
+        self.shutdown = self.box.shutdown
+
+    def include_if(self, c):
+        return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL
+
+
 class Gossip(bootsteps.ConsumerStep):
     label = 'Gossip'
-    requires = (Events, )
+    requires = (Mingle, )
     _cons_stamp_fields = itemgetter(
         'id', 'clock', 'hostname', 'pid', 'topic', 'action', 'cver',
     )
@@ -747,39 +781,6 @@ class Gossip(bootsteps.ConsumerStep):
             self.clock.forward()
 
 
-class Mingle(bootsteps.StartStopStep):
-    label = 'Mingle'
-    requires = (Gossip, )
-    compatible_transports = set(['amqp', 'redis'])
-
-    def __init__(self, c, without_mingle=False, **kwargs):
-        self.enabled = not without_mingle and self.compatible_transport(c.app)
-
-    def compatible_transport(self, app):
-        with app.connection() as conn:
-            return conn.transport.driver_type in self.compatible_transports
-
-    def start(self, c):
-        info('mingle: searching for neighbors')
-        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
-        replies = I.hello(c.hostname, revoked._data) or {}
-        replies.pop(c.hostname, None)
-        if replies:
-            info('mingle: sync with %s nodes',
-                 len([reply for reply, value in items(replies) if value]))
-            for reply in values(replies):
-                if reply:
-                    try:
-                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
-                    except KeyError:  # reply from pre-3.1 worker
-                        pass
-                    else:
-                        c.app.clock.adjust(other_clock)
-                        revoked.update(other_revoked)
-            info('mingle: sync complete')
-        else:
-            info('mingle: all alone')
-
 
 class Evloop(bootsteps.StartStopStep):
     label = 'event loop'