Browse Source

[Worker] Refactor Mingle to be reusable

Ask Solem 9 years ago
parent
commit
07035f627b
2 changed files with 63 additions and 53 deletions
  1. 36 38
      celery/tests/worker/test_consumer.py
  2. 27 15
      celery/worker/consumer/mingle.py

+ 36 - 38
celery/tests/worker/test_consumer.py

@@ -6,7 +6,6 @@ import socket
 from billiard.exceptions import RestartFreqExceeded
 
 from celery.datastructures import LimitedSet
-from celery.worker import state as worker_state
 from celery.worker.consumer.agent import Agent
 from celery.worker.consumer.consumer import CLOSE, Consumer, dump_body
 from celery.worker.consumer.gossip import Gossip
@@ -278,43 +277,42 @@ class test_Mingle(AppCase):
         mingle.start(c)
 
     def test_start(self):
-        try:
-            c = Mock()
-            c.app.connection_for_read = _amqp_connection()
-            mingle = Mingle(c)
-            self.assertTrue(mingle.enabled)
-
-            Aig = LimitedSet()
-            Big = LimitedSet()
-            Aig.add('Aig-1')
-            Aig.add('Aig-2')
-            Big.add('Big-1')
-
-            I = c.app.control.inspect.return_value = Mock()
-            I.hello.return_value = {
-                'A@example.com': {
-                    'clock': 312,
-                    'revoked': Aig._data,
-                },
-                'B@example.com': {
-                    'clock': 29,
-                    'revoked': Big._data,
-                },
-                'C@example.com': {
-                    'error': 'unknown method',
-                },
-            }
-
-            mingle.start(c)
-            I.hello.assert_called_with(c.hostname, worker_state.revoked._data)
-            c.app.clock.adjust.assert_has_calls([
-                call(312), call(29),
-            ], any_order=True)
-            self.assertIn('Aig-1', worker_state.revoked)
-            self.assertIn('Aig-2', worker_state.revoked)
-            self.assertIn('Big-1', worker_state.revoked)
-        finally:
-            worker_state.revoked.clear()
+        c = Mock()
+        c.app.connection_for_read = _amqp_connection()
+        mingle = Mingle(c)
+        self.assertTrue(mingle.enabled)
+
+        Aig = LimitedSet()
+        Big = LimitedSet()
+        Aig.add('Aig-1')
+        Aig.add('Aig-2')
+        Big.add('Big-1')
+
+        I = c.app.control.inspect.return_value = Mock()
+        I.hello.return_value = {
+            'A@example.com': {
+                'clock': 312,
+                'revoked': Aig._data,
+            },
+            'B@example.com': {
+                'clock': 29,
+                'revoked': Big._data,
+            },
+            'C@example.com': {
+                'error': 'unknown method',
+            },
+        }
+
+        our_revoked = c.controller.state.revoked = LimitedSet()
+
+        mingle.start(c)
+        I.hello.assert_called_with(c.hostname, our_revoked._data)
+        c.app.clock.adjust.assert_has_calls([
+            call(312), call(29),
+        ], any_order=True)
+        self.assertIn('Aig-1', our_revoked)
+        self.assertIn('Aig-2', our_revoked)
+        self.assertIn('Big-1', our_revoked)
 
 
 def _amqp_connection():

+ 27 - 15
celery/worker/consumer/mingle.py

@@ -3,11 +3,9 @@ from __future__ import absolute_import, unicode_literals
 from operator import itemgetter
 
 from celery import bootsteps
-from celery.five import items, values
+from celery.five import items
 from celery.utils.log import get_logger
 
-from celery.worker.state import revoked
-
 from .events import Events
 
 __all__ = ['Mingle']
@@ -15,7 +13,7 @@ __all__ = ['Mingle']
 MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
 
 logger = get_logger(__name__)
-info = logger.info
+debug, info, exception = logger.debug, logger.info, logger.exception
 
 
 class Mingle(bootsteps.StartStopStep):
@@ -34,20 +32,34 @@ class Mingle(bootsteps.StartStopStep):
     def start(self, c):
         info('mingle: searching for neighbors')
         I = c.app.control.inspect(timeout=1.0, connection=c.connection)
-        replies = I.hello(c.hostname, revoked._data) or {}
-        replies.pop(c.hostname, None)
+        our_revoked = c.controller.state.revoked
+        replies = I.hello(c.hostname, our_revoked._data) or {}
+        replies.pop(c.hostname, None)  # delete my own response
         if replies:
             info('mingle: sync with %s nodes',
                  len([reply for reply, value in items(replies) if value]))
-            for reply in values(replies):
-                if reply:
-                    try:
-                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
-                    except KeyError:  # reply from pre-3.1 worker
-                        pass
-                    else:
-                        c.app.clock.adjust(other_clock)
-                        revoked.update(other_revoked)
+            [self.on_node_reply(c, nodename, reply)
+             for nodename, reply in items(replies) if reply]
             info('mingle: sync complete')
         else:
             info('mingle: all alone')
+
+    def on_node_reply(self, c, nodename, reply):
+        debug('mingle: processing reply from %s', nodename)
+        try:
+            self.sync_with_node(c, **reply)
+        except MemoryError:
+            raise
+        except Exception as exc:
+            exception('mingle: sync with %s failed: %r', nodename, exc)
+
+    def sync_with_node(self, c, clock=None, revoked=None, **kwargs):
+        self.on_clock_event(c, clock)
+        self.on_revoked_received(c, revoked)
+
+    def on_clock_event(self, c, clock):
+        c.app.clock.adjust(clock) if clock else c.app.clock.forward()
+
+    def on_revoked_received(self, c, revoked):
+        if revoked:
+            c.controller.state.revoked.update(revoked)