Browse Source

Can't call worker.stop/terminate from signal handler if running with eventlet

Ask Solem 14 years ago
parent
commit
cc57a3e8ec

+ 5 - 3
celery/apps/worker.py

@@ -12,7 +12,7 @@ from celery import __version__
 from celery import platforms
 from celery import signals
 from celery.app import app_or_default
-from celery.exceptions import ImproperlyConfigured
+from celery.exceptions import ImproperlyConfigured, SystemTerminate
 from celery.utils import get_full_cls_name, LOG_LEVELS, isatty
 from celery.utils import term
 from celery.worker import WorkController
@@ -259,6 +259,7 @@ def install_worker_int_handler(worker):
             install_worker_int_again_handler(worker)
             worker.logger.warn("celeryd: Warm shutdown (%s)" % (
                 process_name))
+            worker.stop(in_sighandler=True)
         raise SystemExit()
 
     platforms.install_signal_handler("SIGINT", _stop)
@@ -271,6 +272,7 @@ def install_worker_int_again_handler(worker):
         if process_name == "MainProcess":
             worker.logger.warn("celeryd: Cold shutdown (%s)" % (
                 process_name))
+            worker.terminate(in_sighandler=True)
         raise SystemTerminate()
 
     platforms.install_signal_handler("SIGINT", _stop)
@@ -283,7 +285,7 @@ def install_worker_term_handler(worker):
         if process_name == "MainProcess":
             worker.logger.warn("celeryd: Warm shutdown (%s)" % (
                 process_name))
-            worker.stop()
+            worker.stop(in_sighandler=True)
         raise SystemExit()
 
     platforms.install_signal_handler("SIGTERM", _stop)
@@ -295,7 +297,7 @@ def install_worker_restart_handler(worker):
         """Signal handler restarting the current python program."""
         worker.logger.warn("Restarting celeryd (%s)" % (
             " ".join(sys.argv)))
-        worker.stop()
+        worker.stop(in_sighandler=True)
         os.execv(sys.executable, [sys.executable] + sys.argv)
 
     platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)

+ 2 - 0
celery/concurrency/base.py

@@ -18,6 +18,8 @@ class BasePool(object):
     CLOSE = 0x2
     TERMINATE = 0x3
 
+    signal_safe = True
+
     _state = None
     _pool = None
 

+ 4 - 1
celery/concurrency/evg.py

@@ -5,9 +5,12 @@ from celery.concurrency.base import apply_target, BasePool
 
 
 class TaskPool(BasePool):
+    Pool = Pool
+
+    signal_safe = False
 
     def on_start(self):
-        self._pool = Pool(self.limit)
+        self._pool = self.Pool(self.limit)
 
     def on_stop(self):
         if self._pool is not None:

+ 4 - 0
celery/concurrency/evlet.py

@@ -7,6 +7,8 @@ from celery.concurrency.base import apply_target, BasePool
 class TaskPool(BasePool):
     Pool = GreenPool
 
+    signal_safe = False
+
     def on_start(self):
         self._pool = self.Pool(self.limit)
 
@@ -25,5 +27,7 @@ class TaskPool(BasePool):
     @classmethod
     def on_import(cls):
         import eventlet
+        import eventlet.debug
         eventlet.monkey_patch()
+        eventlet.debug.hub_prevent_multiple_readers(False)
 TaskPool.on_import()

+ 21 - 9
celery/worker/__init__.py

@@ -232,11 +232,19 @@ class WorkController(object):
         """Starts the workers main loop."""
         self._state = RUN
 
-        for i, component in enumerate(self.components):
-            self.logger.debug("Starting thread %s..." % (
-                                    component.__class__.__name__))
-            self._running = i + 1
-            self.pool.blocking(component.start)
+        try:
+            for i, component in enumerate(self.components):
+                self.logger.debug("Starting thread %s..." % (
+                                        component.__class__.__name__))
+                self._running = i + 1
+                self.pool.blocking(component.start)
+        except SystemTerminate:
+            self.terminate()
+            raise SystemExit()
+        except (SystemExit, KeyboardInterrupt), exc:
+            self.stop()
+            raise exc
+
 
     def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""
@@ -250,16 +258,20 @@ class WorkController(object):
         except SystemTerminate:
             self.terminate()
             raise SystemExit()
-        except (SystemExit, KeyboardInterrupt):
+        except (SystemExit, KeyboardInterrupt), exc:
             self.stop()
-            raise SystemExit()
+            raise exc
 
-    def stop(self):
+    def stop(self, in_sighandler=False):
         """Graceful shutdown of the worker server."""
+        if in_sighandler and not self.pool.signal_safe:
+            return
         self.pool.blocking(self._shutdown, warm=True)
 
-    def terminate(self):
+    def terminate(self, in_sighandler=False):
         """Not so graceful shutdown of the worker server."""
+        if in_sighandler and not self.pool.signal_safe:
+            return
         self.pool.blocking(self._shutdown, warm=False)
 
     def _shutdown(self, warm=True):