Browse Source

Gossip: Bootsteps can now hook into on_node_join/leave/lost

Ask Solem 9 years ago
parent
commit
ee27089030
3 changed files with 88 additions and 2 deletions
  1. 22 0
      celery/tests/worker/test_consumer.py
  2. 17 0
      celery/worker/consumer.py
  3. 49 2
      docs/userguide/extending.rst

+ 22 - 0
celery/tests/worker/test_consumer.py

@@ -337,6 +337,28 @@ class test_Gossip(AppCase):
         self.assertTrue(g.enabled)
         self.assertIs(c.gossip, g)
 
+    def test_callbacks(self):
+        c = self.Consumer()
+        c.app.connection = _amqp_connection()
+        g = Gossip(c)
+        on_node_join = Mock(name='on_node_join')
+        on_node_join2 = Mock(name='on_node_join2')
+        on_node_leave = Mock(name='on_node_leave')
+        on_node_lost = Mock(name='on.node_lost')
+        g.on.node_join.add(on_node_join)
+        g.on.node_join.add(on_node_join2)
+        g.on.node_leave.add(on_node_leave)
+        g.on.node_lost.add(on_node_lost)
+
+        worker = Mock(name='worker')
+        g.on_node_join(worker)
+        on_node_join.assert_called_with(worker)
+        on_node_join2.assert_called_with(worker)
+        g.on_node_leave(worker)
+        on_node_leave.assert_called_with(worker)
+        g.on_node_lost(worker)
+        on_node_lost.assert_called_with(worker)
+
     def test_election(self):
         c = self.Consumer()
         c.app.connection_for_read = _amqp_connection()

+ 17 - 0
celery/worker/consumer.py

@@ -39,6 +39,7 @@ from celery.exceptions import InvalidTaskError, NotRegistered
 from celery.utils import gethostname
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
+from celery.utils.objects import Bunch
 from celery.utils.text import truncate
 from celery.utils.timeutils import humanize_seconds, rate
 
@@ -749,6 +750,11 @@ class Gossip(bootsteps.ConsumerStep):
         self.Receiver = c.app.events.Receiver
         self.hostname = c.hostname
         self.full_hostname = '.'.join([self.hostname, str(c.pid)])
+        self.on = Bunch(
+            node_join=set(),
+            node_leave=set(),
+            node_lost=set(),
+        )
 
         self.timer = c.timer
         if self.enabled:
@@ -836,12 +842,23 @@ class Gossip(bootsteps.ConsumerStep):
 
     def on_node_join(self, worker):
         debug('%s joined the party', worker.hostname)
+        self._call_handlers(self.on.node_join, worker)
 
     def on_node_leave(self, worker):
         debug('%s left', worker.hostname)
+        self._call_handlers(self.on.node_leave, worker)
 
     def on_node_lost(self, worker):
         info('missed heartbeat from %s', worker.hostname)
+        self._call_handlers(self.on.node_lost, worker)
+
+    def _call_handlers(self, handlers, *args, **kwargs):
+        for handler in handlers:
+            try:
+                handler(*args, **kwargs)
+            except Exception as exc:
+                error('Ignored error from handler %r: %r',
+                      handler, exc, exc_info=1)
 
     def register_timer(self):
         if self._tref is not None:

+ 49 - 2
docs/userguide/extending.rst

@@ -356,8 +356,55 @@ Attributes
 
     .. code-block:: python
 
-        class Step(bootsteps.StartStopStep):
-            requires = ('celery.worker.consumer:Events',)
+        class RatelimitStep(bootsteps.StartStopStep):
+            """Rate limit tasks based on the number of workers in the
+            cluster."""
+            requires = ('celery.worker.consumer:Gossip',)
+
+            def start(self, c):
+                self.c = c
+                self.c.gossip.on.node_join.add(self.on_cluster_size_change)
+                self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
+                self.c.gossip.on.node_lost.add(self.on_node_lost)
+                self.tasks = [
+                    self.app.tasks['proj.tasks.add']
+                    self.app.tasks['proj.tasks.mul']
+                ]
+                self.last_size = None
+
+            def on_cluster_size_change(self, worker):
+                cluster_size = len(self.c.gossip.state.alive_workers())
+                if cluster_size != self.last_size:
+                    for task in self.tasks:
+                        task.rate_limit = 1.0 / cluster_size
+                    self.c.reset_rate_limits()
+                    self.last_size = cluster_size
+
+            def on_node_lost(self, worker):
+                # may have processed heartbeat too late, so wake up in a while
+                # to see if the worker recovered
+                self.c.timer.call_after(10.0, self.on_cluster_size_change)
+
+    **Callbacks**
+
+    - ``gossip.on.node_join(worker)``
+
+        Called whenever a new node joins the cluster, providing a
+        :class:`~celery.events.state.Worker` instance.
+
+    - ``gossip.on.node_leave(worker)``
+
+        Called whenever a new node leaves the cluster (shuts down),
+        providing a :class:`~celery.events.state.Worker` instance.
+
+    - ``gossip.on.node_lost(worker)``
+
+        Called whenever heartbeat was missed for a worker instance in the
+        cluster (heartbeat not received or processed in time),
+        providing a :class:`~celery.events.state.Worker` instance.
+
+        This does not necessarily mean the worker is actually offline, so use a time
+        out mechanism if the default heartbeat timeout is not sufficient.
 
 .. attribute:: pool