Browse Source

Merge branch 'aaronelliotross/master'

Conflicts:
	celery/worker/__init__.py
Ask Solem 15 years ago
parent
commit
fa0a4d4763
2 changed files with 32 additions and 1 deletions
  1. 12 0
      celery/bin/celeryd.py
  2. 20 1
      celery/worker/__init__.py

+ 12 - 0
celery/bin/celeryd.py

@@ -216,6 +216,7 @@ class Worker(object):
         # Install signal handler so SIGHUP restarts the worker.
         install_worker_restart_handler(worker)
         install_worker_term_handler(worker)
+        install_worker_int_handler(worker)
 
         signals.worker_init.send(sender=worker)
         try:
@@ -226,6 +227,17 @@ class Worker(object):
                         exc.__class__, exc, traceback.format_exc()))
 
 
+def install_worker_int_handler(worker):
+
+    def _stop(signum, frame):
+        if multiprocessing.current_process().name == 'MainProcess':
+            worker.logger.warn("celeryd: Cold shutdown (%s)" % \
+                               (current_process().name))
+            worker.terminate()
+        raise SystemExit()
+
+    platform.install_signal_handler("SIGINT", _stop)
+
 def install_worker_term_handler(worker):
 
     def _stop(signum, frame):

+ 20 - 1
celery/worker/__init__.py

@@ -192,6 +192,25 @@ class WorkController(object):
             return
 
         signals.worker_shutdown.send(sender=self)
-        [component.stop() for component in reversed(self.components)]
+        for component in reversed(self.components):
+            self.logger.debug("Stopping thread %s..." % (
+                              component.__class__.__name__))
+            component.stop()
+
+        self.listener.close_connection()
+        self._state = "STOP"
+
+    def terminate(self):
+        """Not so gracefully shutdown the worker server."""
+        if self._state != "RUN":
+            return
+
+        signals.worker_shutdown.send(sender=self)
+        for component in reversed(self.components):
+            self.logger.debug("Terminating thread %s..." % (
+                              component.__class__.__name__))
+            terminate = getattr(component, "terminate", component.stop)
+            terminate()
+
         self.listener.close_connection()
         self._state = "STOP"