Forráskód Böngészése

Adds new worker_process_shutdown signal (Closes #1595)

Dan 11 éve
szülő
commit
3497867ab9

+ 1 - 0
CONTRIBUTORS.txt

@@ -146,3 +146,4 @@ Matt Robenolt, 2013/08/31
 Jameel Al-Aziz, 2013/10/04
 Fazleev Maksim, 2013/10/08
 Ian A Wilson, 2013/10/18
+Daniel M Taub, 2013/10/22

+ 13 - 1
celery/concurrency/prefork.py

@@ -129,6 +129,18 @@ def process_initializer(app, hostname):
     signals.worker_process_init.send(sender=None)
 
 
+def process_destructor(pid=None, code=None):
+    """Pool child process destructor
+
+    This will log a debug message and fire off a signal so that
+    users can run custom cleanup code just before a worker process
+    exits
+
+    """
+    debug("Worker process with pid %i exited with code \"%s\".",pid,code)
+    signals.worker_process_shutdown.send(sender=None)
+
+
 def _select(readers=None, writers=None, err=None, timeout=0):
     """Simple wrapper to :class:`~select.select`.
 
@@ -183,7 +195,6 @@ class Worker(_pool.Worker):
         # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
-
 class ResultHandler(_pool.ResultHandler):
     """Handles messages from the pool processes."""
 
@@ -1004,6 +1015,7 @@ class TaskPool(BasePool):
                 else self.Pool)
         P = self._pool = Pool(processes=self.limit,
                               initializer=process_initializer,
+                              deinitializer=process_destructor,
                               synack=False,
                               **self.options)
 

+ 1 - 0
celery/signals.py

@@ -54,6 +54,7 @@ celeryd_init = Signal(providing_args=['instance', 'conf', 'options'])
 celeryd_after_setup = Signal(providing_args=['instance', 'conf'])
 worker_init = Signal(providing_args=[])
 worker_process_init = Signal(providing_args=[])
+worker_process_shutdown = Signal(providing_args=[])
 worker_ready = Signal(providing_args=[])
 worker_shutdown = Signal(providing_args=[])
 setup_logging = Signal(providing_args=[

+ 7 - 0
celery/tests/worker/test_worker.py

@@ -751,6 +751,13 @@ class test_WorkController(AppCase):
             self.worker._send_worker_shutdown()
             ws.send.assert_called_with(sender=self.worker)
 
+    def test_process_shutdown_on_worker_shutdown(self):
+        from celery.concurrency.processes import Worker, process_destructor
+        with patch('celery.signals.worker_process_shutdown') as ws:
+            Worker._make_shortcuts = Mock()
+            Worker.on_loop_stop(Worker(None,None,deinitializer=process_destructor),22,'foo')
+            ws.send.assert_called_with(sender=None)
+
     def test_process_task_revoked_release_semaphore(self):
         self.worker._quick_release = Mock()
         req = Mock()

+ 7 - 0
docs/userguide/signals.rst

@@ -371,6 +371,13 @@ worker_process_init
 
 Dispatched by each new pool worker process when it starts.
 
+.. signal:: worker_process_shutdown
+
+worker_process_shutdown
+~~~~~~~~~~~~~~~~~~~
+
+Dispatched by each new pool worker process when it is about to shut down.
+
 .. signal:: worker_shutdown
 
 worker_shutdown