Prechádzať zdrojové kódy

Cleanly shutdown the pool by catching TERM and calling close(), uncleanly shutdown the pool by catching INT and calling terminate()

Aaron Ross 15 rokov pred
rodič
commit
2cd7f229ff
4 zmenil súbory, kde vykonal 43 pridanie a 4 odobranie
  1. 1 1
      celery/beat.py
  2. 20 0
      celery/bin/celeryd.py
  3. 20 1
      celery/worker/__init__.py
  4. 2 2
      celery/worker/pool.py

+ 1 - 1
celery/beat.py

@@ -263,7 +263,7 @@ def EmbeddedClockService(*args, **kwargs):
 
         def stop(self):
             self.clockservice.stop()
-            self.terminate()
+            self.close()
 
     if kwargs.pop("thread", False):
         # Need short max interval to be able to stop thread

+ 20 - 0
celery/bin/celeryd.py

@@ -45,6 +45,9 @@ import optparse
 import traceback
 import multiprocessing
 
+from multiprocessing.process import current_process
+
+
 import celery
 from celery import conf
 from celery import signals
@@ -200,6 +203,7 @@ class Worker(object):
         # Install signal handler so SIGHUP restarts the worker.
         install_worker_restart_handler(worker)
         install_worker_term_handler(worker)
+        install_worker_int_handler(worker)
 
         signals.worker_init.send(sender=worker)
         try:
@@ -210,10 +214,26 @@ class Worker(object):
                         exc.__class__, exc, traceback.format_exc()))
 
 
+def install_worker_int_handler(worker):
+
+    def _stop(signum, frame):
+        if current_process().name == 'MainProcess':
+            worker.logger.warn("Hard stopping celeryd (%s)" % \
+                               (current_process().name))
+            worker.terminate()
+        raise SystemExit()
+
+    platform.install_signal_handler("SIGINT", _stop)
+
 def install_worker_term_handler(worker):
 
     def _stop(signum, frame):
+        if current_process().name == 'MainProcess':
+            worker.logger.warn("Stopping celeryd (%s)" % \
+                               (current_process().name))
+            worker.stop()
         raise SystemExit()
+
     platform.install_signal_handler("SIGTERM", _stop)
 
 def install_worker_restart_handler(worker):

+ 20 - 1
celery/worker/__init__.py

@@ -187,6 +187,25 @@ class WorkController(object):
             return
 
         signals.worker_shutdown.send(sender=self)
-        [component.stop() for component in reversed(self.components)]
+        for component in reversed(self.components):
+            self.logger.debug("Stopping thread %s..." % \
+                              component.__class__.__name__)
+            component.stop()
 
         self._state = "STOP"
+
+    def terminate(self):
+        """Not so gracefully shutdown the worker server."""
+        if self._state != "RUN":
+            return
+
+        signals.worker_shutdown.send(sender=self)
+        for component in reversed(self.components):
+            self.logger.debug("Terminating thread %s..." % \
+                              component.__class__.__name__)
+            if hasattr(component,'terminate'):
+                component.terminate()
+            else:
+                component.stop()
+
+            self._state = "STOP"

+ 2 - 2
celery/worker/pool.py

@@ -45,9 +45,9 @@ class TaskPool(object):
 
     def stop(self):
         """Terminate the pool."""
-        self._pool.terminate()
+        self._pool.close()
         self._pool.join()
-        self._pool = None
+        # self._pool = None
 
     def replace_dead_workers(self):
         self.logger.debug("TaskPool: Finding dead pool processes...")