Przeglądaj źródła

celeryd: Ctrl+C (SIGINT) once does warm shutdown, hitting Ctrl+C twice forces termination.

Ask Solem 15 lat temu
rodzic
commit
6d3273d9a9

+ 19 - 1
celery/bin/celeryd.py

@@ -307,17 +307,35 @@ class Worker(object):
 
 def install_worker_int_handler(worker):
 
+    def _stop(signum, frame):
+        process_name = multiprocessing.current_process().name
+        if process_name == "MainProcess":
+            worker.logger.warn(
+                "celeryd: Hitting Ctrl+C again will terminate "
+                "all running tasks!")
+            install_worker_int_again_handler(worker)
+            worker.logger.warn("celeryd: Warm shutdown (%s)" % (
+                process_name))
+            worker.stop()
+        raise SystemExit()
+
+    platform.install_signal_handler("SIGINT", _stop)
+
+
+def install_worker_int_again_handler(worker):
+
     def _stop(signum, frame):
         process_name = multiprocessing.current_process().name
         if process_name == "MainProcess":
             worker.logger.warn("celeryd: Cold shutdown (%s)" % (
-                                    process_name))
+                process_name))
             worker.terminate()
         raise SystemExit()
 
     platform.install_signal_handler("SIGINT", _stop)
 
 
+
 def install_worker_term_handler(worker):
 
     def _stop(signum, frame):

+ 7 - 1
celery/concurrency/processes/__init__.py

@@ -53,12 +53,18 @@ class TaskPool(object):
                           maxtasksperchild=self.maxtasksperchild)
 
     def stop(self):
-        """Terminate the pool."""
+        """Gracefully stop the pool."""
         if self._pool is not None and self._pool._state == RUN:
             self._pool.close()
             self._pool.join()
             self._pool = None
 
+    def terminate(self):
+        """Force terminate the pool."""
+        if self._pool is not None:
+            self._pool.terminate()
+            self._pool = None
+
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, accept_callback=None, timeout_callback=None,
             **compat):

+ 24 - 21
celery/worker/__init__.py

@@ -20,6 +20,10 @@ from celery.utils import noop, instantiate
 from celery.worker.buckets import TaskBucket, FastQueue
 from celery.worker.scheduler import Scheduler
 
+RUN = 0x1
+CLOSE = 0x2
+TERMINATE = 0x3
+
 
 def process_initializer():
     # There seems to a bug in multiprocessing (backport?)
@@ -182,7 +186,7 @@ class WorkController(object):
 
     def start(self):
         """Starts the workers main loop."""
-        self._state = "RUN"
+        self._state = RUN
 
         try:
             for i, component in enumerate(self.components):
@@ -206,32 +210,31 @@ class WorkController(object):
             self.stop()
 
     def stop(self):
-        """Gracefully shutdown the worker server."""
-        if self._state != "RUN":
-            return
-        if self._running != len(self.components):
-            return
+        """Graceful shutdown of the worker server."""
+        self._shutdown(warm=True)
 
-        signals.worker_shutdown.send(sender=self)
-        for component in reversed(self.components):
-            self.logger.debug("Stopping thread %s..." % (
-                              component.__class__.__name__))
-            component.stop()
+    def terminate(self):
+        """Not so graceful shutdown of the worker server."""
+        self._shutdown(warm=False)
 
-        self.listener.close_connection()
-        self._state = "STOP"
+    def _shutdown(self, warm=True):
+        """Gracefully shutdown the worker server."""
+        what = (warm and "stopping" or "terminating").capitalize()
 
-    def terminate(self):
-        """Not so gracefully shutdown the worker server."""
-        if self._state != "RUN":
+        if self._state != RUN or self._running != len(self.components):
+            # Not fully started, can safely exit.
             return
 
+        self._state = CLOSE
         signals.worker_shutdown.send(sender=self)
+
         for component in reversed(self.components):
-            self.logger.debug("Terminating thread %s..." % (
-                              component.__class__.__name__))
-            terminate = getattr(component, "terminate", component.stop)
-            terminate()
+            self.logger.debug("%s thread %s..." % (
+                    what, component.__class__.__name__))
+            stop = component.stop
+            if not warm:
+                stop = getattr(component, "terminate", stop)
+            stop()
 
         self.listener.close_connection()
-        self._state = "STOP"
+        self._state = TERMINATE