소스 검색

Handle SIGTERM, embedded celerybeat should not be daemon=True.

Ask Solem 15 년 전
부모
커밋
e429a776d7
3개의 변경된 파일15개의 추가작업 그리고 3개의 파일을 삭제
  1. 7 3
      celery/beat.py
  2. 7 0
      celery/bin/celeryd.py
  3. 1 0
      celery/worker/pool.py

+ 7 - 3
celery/beat.py

@@ -13,6 +13,7 @@ from UserDict import UserDict
 from celery import log
 from celery import log
 from celery import conf
 from celery import conf
 from celery import registry as _registry
 from celery import registry as _registry
+from celery import platform
 from celery.utils.info import humanize_seconds
 from celery.utils.info import humanize_seconds
 
 
 
 
@@ -175,13 +176,16 @@ class ClockService(object):
         self.debug = log.SilenceRepeated(self.logger.debug,
         self.debug = log.SilenceRepeated(self.logger.debug,
                                          max_iterations=silence)
                                          max_iterations=silence)
 
 
-    def start(self):
+    def start(self, embedded_process=False):
         self.logger.info("ClockService: Starting...")
         self.logger.info("ClockService: Starting...")
         self.logger.debug("ClockService: "
         self.logger.debug("ClockService: "
             "Ticking with max interval->%s, schedule->%s" % (
             "Ticking with max interval->%s, schedule->%s" % (
                     humanize_seconds(self.max_interval),
                     humanize_seconds(self.max_interval),
                     self.schedule_filename))
                     self.schedule_filename))
 
 
+        if embedded_process:
+            platform.set_process_title("celerybeat")
+
         try:
         try:
             while True:
             while True:
                 if self._shutdown.isSet():
                 if self._shutdown.isSet():
@@ -253,13 +257,13 @@ def EmbeddedClockService(*args, **kwargs):
         def __init__(self, *args, **kwargs):
         def __init__(self, *args, **kwargs):
             super(_Process, self).__init__()
             super(_Process, self).__init__()
             self.clockservice = ClockService(*args, **kwargs)
             self.clockservice = ClockService(*args, **kwargs)
-            self.daemon = True
 
 
         def run(self):
         def run(self):
-            self.clockservice.start()
+            self.clockservice.start(embedded_process=True)
 
 
         def stop(self):
         def stop(self):
             self.clockservice.stop()
             self.clockservice.stop()
+            self.terminate()
 
 
     if kwargs.pop("thread", False):
     if kwargs.pop("thread", False):
         # Need short max interval to be able to stop thread
         # Need short max interval to be able to stop thread

+ 7 - 0
celery/bin/celeryd.py

@@ -199,6 +199,7 @@ class Worker(object):
 
 
         # Install signal handler so SIGHUP restarts the worker.
         # Install signal handler so SIGHUP restarts the worker.
         install_worker_restart_handler(worker)
         install_worker_restart_handler(worker)
+        install_worker_term_handler(worker)
 
 
         signals.worker_init.send(sender=worker)
         signals.worker_init.send(sender=worker)
         try:
         try:
@@ -209,6 +210,12 @@ class Worker(object):
                         exc.__class__, exc, traceback.format_exc()))
                         exc.__class__, exc, traceback.format_exc()))
 
 
 
 
+def install_worker_term_handler(worker):
+
+    def _stop(signum, frame):
+        raise SystemExit()
+    platform.install_signal_handler("SIGTERM", _stop)
+
 def install_worker_restart_handler(worker):
 def install_worker_restart_handler(worker):
 
 
     def restart_worker_sig_handler(signum, frame):
     def restart_worker_sig_handler(signum, frame):

+ 1 - 0
celery/worker/pool.py

@@ -46,6 +46,7 @@ class TaskPool(object):
     def stop(self):
     def stop(self):
         """Terminate the pool."""
         """Terminate the pool."""
         self._pool.terminate()
         self._pool.terminate()
+        self._pool.join()
         self._pool = None
         self._pool = None
 
 
     def replace_dead_workers(self):
     def replace_dead_workers(self):