Selaa lähdekoodia

Add worker_shutting_down signal (#3998)

Daniel Huang 7 vuotta sitten
vanhempi
commit
19a7d8f925
4 muutettua tiedostoa jossa 42 lisäystä ja 2 poistoa
  1. 4 0
      celery/apps/worker.py
  2. 3 2
      celery/signals.py
  3. 21 0
      docs/userguide/signals.rst
  4. 14 0
      t/unit/bin/test_worker.py

+ 4 - 0
celery/apps/worker.py

@@ -277,6 +277,10 @@ def _shutdown_handler(worker, sig='TERM', how='Warm',
                 if callback:
                     callback(worker)
                 safe_say('worker: {0} shutdown (MainProcess)'.format(how))
+                signals.worker_shutting_down.send(
+                    sender=worker.hostname, sig=sig, how=how,
+                    exitcode=exitcode,
+                )
             if active_thread_count() > 1:
                 setattr(state, {'Warm': 'should_stop',
                                 'Cold': 'should_terminate'}[how], exitcode)

+ 3 - 2
celery/signals.py

@@ -19,8 +19,8 @@ __all__ = [
     'task_prerun', 'task_postrun', 'task_success',
     'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
     'celeryd_after_setup', 'worker_init', 'worker_process_init',
-    'worker_ready', 'worker_shutdown', 'setup_logging',
-    'after_setup_logger', 'after_setup_task_logger',
+    'worker_ready', 'worker_shutdown', 'worker_shutting_down',
+    'setup_logging', 'after_setup_logger', 'after_setup_task_logger',
     'beat_init', 'beat_embedded_init', 'heartbeat_sent',
     'eventlet_pool_started', 'eventlet_pool_preshutdown',
     'eventlet_pool_postshutdown', 'eventlet_pool_apply',
@@ -99,6 +99,7 @@ worker_process_init = Signal(name='worker_process_init')
 worker_process_shutdown = Signal(name='worker_process_shutdown')
 worker_ready = Signal(name='worker_ready')
 worker_shutdown = Signal(name='worker_shutdown')
+worker_shutting_down = Signal(name='worker_shutting_down')
 heartbeat_sent = Signal(name='heartbeat_sent')
 
 # - Logging

+ 21 - 0
docs/userguide/signals.rst

@@ -499,6 +499,27 @@ Dispatched when Celery sends a worker heartbeat.
 
 Sender is the :class:`celery.worker.heartbeat.Heart` instance.
 
+.. signal:: worker_shutting_down
+
+``worker_shutting_down``
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+Dispatched when the worker begins the shutdown process.
+
+Provides arguments:
+
+* ``sig``
+
+    The POSIX signal that was received.
+
+* ``how``
+
+    The shutdown method, warm or cold.
+
+* ``exitcode``
+
+    The exitcode that will be used when the main process exits.
+
 .. signal:: worker_process_init
 
 ``worker_process_init``

+ 14 - 0
t/unit/bin/test_worker.py

@@ -425,6 +425,7 @@ class test_funs:
 class test_signal_handlers:
 
     class _Worker(object):
+        hostname = 'foo'
         stopped = False
         terminated = False
 
@@ -642,3 +643,16 @@ class test_signal_handlers:
             handlers = self.psig(cd.install_worker_term_hard_handler, worker)
             with pytest.raises(WorkerTerminate):
                 handlers['SIGQUIT']('SIGQUIT', object())
+
+    def test_send_worker_shutting_down_signal(self):
+        with patch('celery.apps.worker.signals.worker_shutting_down') as wsd:
+            worker = self._Worker()
+            handlers = self.psig(cd.install_worker_term_handler, worker)
+            try:
+                with pytest.raises(WorkerShutdown):
+                    handlers['SIGTERM']('SIGTERM', object())
+            finally:
+                state.should_stop = None
+            wsd.send.assert_called_with(
+                sender='foo', sig='SIGTERM', how='Warm', exitcode=0,
+            )