Browse Source

Mingle/Gossip sync with originator

Ask Solem 11 years ago
parent
commit
f6bef0230a
3 changed files with 37 additions and 30 deletions
  1. 2 2
      celery/app/control.py
  2. 28 25
      celery/worker/consumer.py
  3. 7 3
      celery/worker/control.py

+ 2 - 2
celery/app/control.py

@@ -104,8 +104,8 @@ class Inspect(object):
     def conf(self):
         return self._request('dump_conf')
 
-    def hello(self):
-        return self._request('hello')
+    def hello(self, from_node, revoked=None):
+        return self._request('hello', from_node=from_node, revoked=revoked)
 
     def memsample(self):
         return self._request('memsample')

+ 28 - 25
celery/worker/consumer.py

@@ -544,31 +544,6 @@ class Agent(bootsteps.StartStopStep):
         return agent
 
 
-class Mingle(bootsteps.StartStopStep):
-    label = 'Mingle'
-    requires = (Connection, )
-
-    def __init__(self, c, enable_mingle=True, **kwargs):
-        self.enabled = enable_mingle
-
-    def start(self, c):
-        info('mingle: searching for neighbors')
-        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
-        replies = I.hello()
-        if replies:
-            for reply in values(replies):
-                try:
-                    other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
-                except KeyError:  # reply from pre-3.1 worker
-                    pass
-                else:
-                    c.app.clock.adjust(other_clock)
-                    revoked.update(other_revoked)
-            info('mingle: synced with %s', ', '.join(replies))
-        else:
-            info('mingle: no one here')
-
-
 class Gossip(bootsteps.ConsumerStep):
     label = 'Gossip'
     requires = (Events, )
@@ -719,6 +694,34 @@ class Gossip(bootsteps.ConsumerStep):
             self.clock.forward()
 
 
+class Mingle(bootsteps.StartStopStep):
+    label = 'Mingle'
+    requires = (Gossip, )
+
+    def __init__(self, c, enable_mingle=True, **kwargs):
+        self.enabled = enable_mingle
+
+    def start(self, c):
+        info('mingle: searching for neighbors')
+        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
+        replies = I.hello(c.hostname, revoked._data)
+        replies.pop(c.hostname, None)
+        if replies:
+            info('mingle: hello %s! sync with me',
+                 ', '.join(reply for reply, value in items(replies) if value))
+            for reply in values(replies):
+                if reply:
+                    try:
+                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
+                    except KeyError:  # reply from pre-3.1 worker
+                        pass
+                    else:
+                        c.app.clock.adjust(other_clock)
+                        revoked.update(other_revoked)
+        else:
+            info('mingle: all alone')
+
+
 class Evloop(bootsteps.StartStopStep):
     label = 'event loop'
     last = True

+ 7 - 3
celery/worker/control.py

@@ -246,9 +246,13 @@ def dump_revoked(state, **kwargs):
 
 
 @Panel.register
-def hello(state, **kwargs):
-    return {'revoked': worker_state.revoked._data,
-            'clock': state.app.clock.forward()}
+def hello(state, from_node, revoked=None, **kwargs):
+    if from_node != state.hostname:
+        logger.info('sync with %s', from_node)
+        worker_state.revoked.update(revoked)
+        return  {'revoked': worker_state.revoked._data,
+                 'clock': state.app.clock.forward()}
+
 
 
 @Panel.register