Browse Source

Added a one_for_all supervisor which restarts WorkerController if it stops

Ask Solem 16 years ago
parent
commit
07058f793b
2 changed files with 98 additions and 9 deletions
  1. 14 9
      celery/bin/celeryd.py
  2. 84 0
      celery/supervisor.py

+ 14 - 9
celery/bin/celeryd.py

@@ -71,6 +71,7 @@ if django_project_dir:
 
 from django.conf import settings
 from celery import __version__
+from celery.supervisor import OFASupervisor
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
@@ -248,18 +249,22 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         context.open()
 
     discovery.autodiscover()
-    worker = WorkController(concurrency=concurrency,
-                            loglevel=loglevel,
-                            logfile=logfile,
-                            is_detached=detach)
 
-    try:
-        worker.run()
-    except Exception, e:
-        emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
+    def run_worker():
+        worker = WorkController(concurrency=concurrency,
+                                loglevel=loglevel,
+                                logfile=logfile,
+                                is_detached=detach)
+        try:
+            worker.run()
+        except Exception, e:
+            emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
+
+    try:
+        OFASupervisor(target=run_worker).start()
     except:
-        if daemon:
+        if detach:
             context.close()
         raise
 

+ 84 - 0
celery/supervisor.py

@@ -0,0 +1,84 @@
+from multiprocessing import Process, TimeoutError
+import threading
+import time
+
+PING_TIMEOUT = 30 # seconds
+JOIN_TIMEOUT = 2 
+CHECK_INTERVAL = 2
+MAX_RESTART_FREQ = 3
+MAX_RESTART_FREQ_TIME = 10
+
+
+def raise_ping_timeout():
+    raise TimeoutError("Supervised: Timed out while pinging process.")
+
+
+class OFASupervisor(object):
+    """Process supervisor using the `one_for_all`_ strategy.
+   
+    .. _`one_for_all`:
+        http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
+    
+    """
+
+    def __init__(self, target, args=None, kwargs=None,
+            ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
+            max_restart_freq_time=MAX_RESTART_FREQ_TIME,
+            max_restart_freq = MAX_RESTART_FREQ,
+            check_interval=CHECK_INTERVAL):
+        self.target = target
+        self.args = args or []
+        self.kwargs = kwargs or {}
+        self.ping_timeout = ping_timeout
+        self.join_timeout = join_timeout
+        self.check_interval = check_interval
+        self.max_restart_freq = max_restart_freq
+        self.max_restart_freq_time = max_restart_freq_time
+        self.restarts_in_frame = 0
+
+    def start(self):
+        target = self.target
+    
+        def _start_supervised_process():
+            process = Process(target=target,
+                              args=self.args, kwargs=self.kwargs)
+            process.start()
+            return process
+    
+        def _restart(self, process):
+            process.join(timeout=self.join_timeout)
+            process.terminate()
+            self.restarts_in_frame += 1
+            process = _start_supervised_process()
+
+        try:
+            process = _start_supervised_process()
+            restart_frame = 0
+            while True:
+                if restart_frame > self.max_restart_freq_time:
+                    if self.restarts_in_frame >= self.max_restart_freq:
+                        raise Exception(
+                                "Supervised: Max restart frequency reached")
+                restart_frame = 0
+                self.restarts_in_frame = 0
+                    
+                try:
+                    proc_is_alive = self.is_alive(process)
+                except TimeoutError:
+                    proc_is_alive = False
+
+                if not proc_is_alive:
+                    self._restart()
+
+                time.sleep(self.check_interval)
+                restart_frame += self.check_interval
+        finally:
+            process.join()
+        
+    def _is_alive(self, process):
+        timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
+        try:
+            alive = process.is_alive()
+        finally:
+            timeout_timer.cancel()
+        return alive