浏览代码

Try to restart when receiving SIGHUP (issue #26) Seems to not always work when
running detached.)

Ask Solem 15 年之前
父节点
当前提交
637b4d887b
共有 2 个文件被更改,包括 31 次插入3 次删除
  1. 27 0
      celery/bin/celeryd.py
  2. 4 3
      celery/worker/__init__.py

+ 27 - 0
celery/bin/celeryd.py

@@ -80,6 +80,7 @@ from celery import conf
 from celery import discovery
 from celery import discovery
 from celery.task import discard_all
 from celery.task import discard_all
 from celery.worker import WorkController
 from celery.worker import WorkController
+from signal import signal, SIGHUP
 import multiprocessing
 import multiprocessing
 import traceback
 import traceback
 import optparse
 import optparse
@@ -254,11 +255,16 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
     # (Usually imports task modules and such.)
     # (Usually imports task modules and such.)
     current_loader.on_worker_init()
     current_loader.on_worker_init()
 
 
+
     def run_worker():
     def run_worker():
         worker = WorkController(concurrency=concurrency,
         worker = WorkController(concurrency=concurrency,
                                 loglevel=loglevel,
                                 loglevel=loglevel,
                                 logfile=logfile,
                                 logfile=logfile,
                                 is_detached=detach)
                                 is_detached=detach)
+
+        # Install signal handler that restarts celeryd on SIGHUP
+        install_restart_signal_handler(worker)
+
         try:
         try:
             worker.start()
             worker.start()
         except Exception, e:
         except Exception, e:
@@ -276,6 +282,27 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         raise
         raise
 
 
 
 
+def install_restart_signal_handler(worker):
+    """Installs a signal handler that restarts the current program
+    when it receives the ``SIGHUP`` signal.
+    """
+
+    def restart_self(signum, frame):
+        """Signal handler restarting the current python program."""
+        worker.logger.info("Restarting celeryd (%s)" % (
+            " ".join(sys.argv)))
+        if worker.is_detached:
+            pid = os.fork()
+            if pid:
+                worker.stop()
+                sys.exit(0)
+        else:
+            worker.stop()
+        os.execve(sys.executable, [sys.executable] + sys.argv, os.environ)
+
+    signal(SIGHUP, restart_self)
+
+
 def parse_options(arguments):
 def parse_options(arguments):
     """Parse the available options to ``celeryd``."""
     """Parse the available options to ``celeryd``."""
     parser = optparse.OptionParser(option_list=OPTION_LIST)
     parser = optparse.OptionParser(option_list=OPTION_LIST)

+ 4 - 3
celery/worker/__init__.py

@@ -5,7 +5,7 @@ The Multiprocessing Worker Server
 Documentation for this module is in ``docs/reference/celery.worker.rst``.
 Documentation for this module is in ``docs/reference/celery.worker.rst``.
 
 
 """
 """
-from carrot.connection import DjangoBrokerConnection
+from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 from celery.worker.controllers import Mediator, PeriodicWorkController
 from celery.worker.controllers import Mediator, PeriodicWorkController
 from celery.worker.job import TaskWrapper
 from celery.worker.job import TaskWrapper
 from celery.registry import NotRegistered
 from celery.registry import NotRegistered
@@ -65,8 +65,7 @@ class AMQPListener(object):
             self.reset_connection()
             self.reset_connection()
             try:
             try:
                 self.consume_messages()
                 self.consume_messages()
-            except (socket.error,
-                    self.amqp_connection.ConnectionException):
+            except (socket.error, AMQPConnectionException):
                 self.logger.error("AMQPListener: Connection to broker lost. "
                 self.logger.error("AMQPListener: Connection to broker lost. "
                                 + "Trying to re-establish connection...")
                                 + "Trying to re-establish connection...")
 
 
@@ -299,3 +298,5 @@ class WorkController(object):
             return
             return
 
 
         [component.stop() for component in reversed(self.components)]
         [component.stop() for component in reversed(self.components)]
+
+        self._state = "STOP"