Ask Solem 8 years ago
parent
commit
67c1caed7f
4 changed files with 41 additions and 25 deletions
  1. 17 14
      celery/signals.py
  2. 9 1
      celery/tests/worker/test_heartbeat.py
  3. 7 2
      celery/worker/heartbeat.py
  4. 8 8
      docs/userguide/signals.rst

+ 17 - 14
celery/signals.py

@@ -16,15 +16,17 @@ from __future__ import absolute_import, unicode_literals
 
 from .utils.dispatch import Signal
 
-__all__ = ['before_task_publish', 'after_task_publish',
-           'task_prerun', 'task_postrun', 'task_success',
-           'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
-           'celeryd_after_setup', 'worker_init', 'worker_process_init',
-           'worker_ready', 'worker_shutdown', 'setup_logging',
-           'after_setup_logger', 'after_setup_task_logger',
-           'beat_init', 'beat_embedded_init', 'heartbeat',
-           'eventlet_pool_started', 'eventlet_pool_preshutdown',
-           'eventlet_pool_postshutdown', 'eventlet_pool_apply']
+__all__ = [
+    'before_task_publish', 'after_task_publish',
+    'task_prerun', 'task_postrun', 'task_success',
+    'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
+    'celeryd_after_setup', 'worker_init', 'worker_process_init',
+    'worker_ready', 'worker_shutdown', 'setup_logging',
+    'after_setup_logger', 'after_setup_task_logger',
+    'beat_init', 'beat_embedded_init', 'heartbeat',
+    'eventlet_pool_started', 'eventlet_pool_preshutdown',
+    'eventlet_pool_postshutdown', 'eventlet_pool_apply',
+]
 
 before_task_publish = Signal(providing_args=[
     'body', 'exchange', 'routing_key', 'headers', 'properties',
@@ -33,10 +35,6 @@ before_task_publish = Signal(providing_args=[
 after_task_publish = Signal(providing_args=[
     'body', 'exchange', 'routing_key',
 ])
-#: Deprecated, use after_task_publish instead.
-task_sent = Signal(providing_args=[
-    'task_id', 'task', 'args', 'kwargs', 'eta', 'taskset',
-])
 task_prerun = Signal(providing_args=['task_id', 'task', 'args', 'kwargs'])
 task_postrun = Signal(providing_args=[
     'task_id', 'task', 'args', 'kwargs', 'retval',
@@ -57,6 +55,11 @@ task_rejected = Signal(providing_args=[
 task_unknown = Signal(providing_args=[
     'message', 'exc', 'name', 'id',
 ])
+#: Deprecated, use after_task_publish instead.
+task_sent = Signal(providing_args=[
+    'task_id', 'task', 'args', 'kwargs', 'eta', 'taskset',
+])
+
 celeryd_init = Signal(providing_args=['instance', 'conf', 'options'])
 celeryd_after_setup = Signal(providing_args=['instance', 'conf'])
 import_modules = Signal(providing_args=[])
@@ -76,7 +79,7 @@ after_setup_task_logger = Signal(providing_args=[
 ])
 beat_init = Signal(providing_args=[])
 beat_embedded_init = Signal(providing_args=[])
-heartbeat = Signal(providing_args=[])
+heartbeat_sent = Signal(providing_args=[])
 eventlet_pool_started = Signal(providing_args=[])
 eventlet_pool_preshutdown = Signal(providing_args=[])
 eventlet_pool_postshutdown = Signal(providing_args=[])

+ 9 - 1
celery/tests/worker/test_heartbeat.py

@@ -1,7 +1,7 @@
 from __future__ import absolute_import, unicode_literals
 
 from celery.worker.heartbeat import Heart
-from celery.tests.case import AppCase
+from celery.tests.case import AppCase, Mock
 
 
 class MockDispatcher(object):
@@ -50,6 +50,14 @@ class test_Heart(AppCase):
         self.assertIsNone(h.tref)
         h.stop()
 
+    def test_send_sends_signal(self):
+        h = Heart(MockTimer(), MockDispatcher(), interval=1)
+        h._send_sent_signal = None
+        h._send('worker-heartbeat')
+        h._send_sent_signal = Mock(name='send_sent_signal')
+        h._send('worker')
+        h._send_sent_signal.assert_called_with(sender=h)
+
     def test_start_when_disabled(self):
         timer = MockTimer()
         eventer = MockDispatcher()

+ 7 - 2
celery/worker/heartbeat.py

@@ -9,7 +9,7 @@
 """
 from __future__ import absolute_import, unicode_literals
 
-from celery.signals import heartbeat
+from celery.signals import heartbeat_sent
 from celery.utils.sysinfo import load_average
 
 from .state import SOFTWARE_INFO, active_requests, all_total_count
@@ -37,8 +37,13 @@ class Heart(object):
         self.eventer.on_enabled.add(self.start)
         self.eventer.on_disabled.add(self.stop)
 
+        # Only send heartbeat_sent signal if it has receivers.
+        self._send_sent_signal = (
+            heartbeat_sent.send if heartbeat_sent.receivers else None)
+
     def _send(self, event):
-        heartbeat.send(sender=self)
+        if self._send_sent_signal is not None:
+            self._send_sent_signal(sender=self)
         return self.eventer.send(event, freq=self.interval,
                                  active=len(active_requests),
                                  processed=all_total_count[0],

+ 8 - 8
docs/userguide/signals.rst

@@ -488,6 +488,14 @@ Dispatched before the worker is started.
 
 Dispatched when the worker is ready to accept work.
 
+.. signal:: heartbeat_sent
+
+``heartbeat_sent``
+~~~~~~~~~~~~~~~~~~
+
+Dispatched when Celery sends a worker heartbeat.  Sender is the
+:class:`celery.worker.heartbeat.Heart` instance.
+
 .. signal:: worker_process_init
 
 ``worker_process_init``
@@ -548,14 +556,6 @@ Dispatched in addition to the :signal:`beat_init` signal when :program:`celery
 beat` is started as an embedded process.  Sender is the
 :class:`celery.beat.Service` instance.
 
-.. signal:: heartbeat
-
-``heartbeat``
-~~~~~~~~~~~~~
-
-Dispatched when Celery sends a worker heartbeat. Sender is the
-:class:`celery.worker.heartbeat.Heart` instance.
-
 Eventlet Signals
 ----------------