ソースを参照

Merge branch 'master' into nodjango

Conflicts:
	celery/worker.py
Ask Solem 16 年 前
コミット
b189e4f8d4
5 ファイル変更168 行追加13 行削除
  1. 26 10
      celery/bin/celeryd.py
  2. 1 1
      celery/loader/django.py
  3. 139 0
      celery/supervisor.py
  4. 1 1
      celery/task.py
  5. 1 1
      celery/worker.py

+ 26 - 10
celery/bin/celeryd.py

@@ -30,6 +30,10 @@
 
     Run in the background as a daemon.
 
+.. cmdoption:: -S, --supervised
+
+    Restart the worker server if it dies.
+
 .. cmdoption:: --discard
 
     Discard all waiting tasks before the daemon is started.
@@ -71,6 +75,7 @@ if django_project_dir:
 
 from django.conf import settings
 from celery import __version__
+from celery.supervisor import OFASupervisor
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
@@ -123,6 +128,9 @@ OPTION_LIST = (
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="detach",
             help="Run in the background as a daemon."),
+    optparse.make_option('-S', '--supervised', default=False,
+            action="store_true", dest="supervised",
+            help="Restart the worker server if it dies."),
     optparse.make_option('-u', '--uid', default=None,
             action="store", dest="uid",
             help="User-id to run celeryd as when in daemon mode."),
@@ -176,7 +184,8 @@ def acquire_pidlock(pidfile):
 def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
-        working_directory=None, chroot=None, statistics=None, **kwargs):
+        supervised=False, working_directory=None, chroot=None,
+        statistics=None, **kwargs):
     """Starts the celery worker server."""
 
     print("Celery %s is starting." % __version__)
@@ -248,18 +257,25 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         context.open()
 
     discovery.autodiscover()
-    worker = WorkController(concurrency=concurrency,
-                            loglevel=loglevel,
-                            logfile=logfile,
-                            is_detached=detach)
 
-    try:
-        worker.run()
-    except Exception, e:
-        emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
+    def run_worker():
+        worker = WorkController(concurrency=concurrency,
+                                loglevel=loglevel,
+                                logfile=logfile,
+                                is_detached=detach)
+        try:
+            worker.run()
+        except Exception, e:
+            emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
+
+    try:
+        if supervised:
+            OFASupervisor(target=run_worker).start()
+        else:
+            run_worker()
     except:
-        if daemon:
+        if detach:
             context.close()
         raise
 

+ 1 - 1
celery/loader/django.py

@@ -6,7 +6,7 @@ class Loader(object):
 
     def read_configuration(self):
         from django.conf import settings
-        return {}
+        return settings
 
     def on_task_init(self, task_id, task):
         # See: http://groups.google.com/group/django-users/browse_thread/

+ 139 - 0
celery/supervisor.py

@@ -0,0 +1,139 @@
+from multiprocessing import Process, TimeoutError
+import threading
+import time
+
+PING_TIMEOUT = 30 # seconds
+JOIN_TIMEOUT = 2
+CHECK_INTERVAL = 2
+MAX_RESTART_FREQ = 3
+MAX_RESTART_FREQ_TIME = 10
+
+
+def raise_ping_timeout():
+    raise TimeoutError("Supervised: Timed out while pinging process.")
+
+
+class OFASupervisor(object):
+    """Process supervisor using the `one_for_all`_ strategy.
+
+    .. _`one_for_all`:
+        http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
+
+    However, instead of registering a list of processes, you have one
+    process which runs a pool. Makes for an easy implementation.
+
+    :param target: see :attr:`target`.
+    :param args: see :attr:`args`.
+    :param kwargs see :attr:`kwargs`.
+    :param join_timeout see :attr:`join_timeout`.
+    :param max_restart_freq see :attr:`max_restart_freq`.
+    :param max_restart_freq_time see :attr:`max_restart_freq_time`.
+    :param check_interval see :attr:`max_restart_freq_time`.
+
+    .. attribute:: target
+
+        The target callable to be launched in a new process.
+
+    .. attribute:: args
+
+        The positional arguments to apply to :attr:`target`.
+
+    .. attribute:: kwargs
+
+        The keyword arguments to apply to :attr:`target`.
+
+    .. attribute:: join_timeout
+
+        If the process is dead, try to give it a few seconds to join.
+
+    .. attribute:: max_restart_freq
+
+        Limit the number of restarts which can occur in a given time interval.
+
+        The max restart frequency is the number of restarts that can occur
+        within the interval :attr:`max_restart_freq_time`.
+
+        The restart mechanism prevents situations where the process repeatedly
+        dies for the same reason. If this happens both the process and the
+        supervisor is terminated.
+
+    .. attribute:: max_restart_freq_time
+
+        See :attr:`max_restart_freq`.
+
+    .. attribute:: check_interval
+
+        The time in seconds, between process pings.
+
+    """
+
+    def __init__(self, target, args=None, kwargs=None,
+            ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
+            max_restart_freq = MAX_RESTART_FREQ,
+            max_restart_freq_time=MAX_RESTART_FREQ_TIME,
+            check_interval=CHECK_INTERVAL):
+        self.target = target
+        self.args = args or []
+        self.kwargs = kwargs or {}
+        self.ping_timeout = ping_timeout
+        self.join_timeout = join_timeout
+        self.check_interval = check_interval
+        self.max_restart_freq = max_restart_freq
+        self.max_restart_freq_time = max_restart_freq_time
+        self.restarts_in_frame = 0
+
+    def start(self):
+        """Launches the :attr:`target` in a seperate process and starts
+        supervising it."""
+        target = self.target
+
+        def _start_supervised_process():
+            """Start the :attr:`target` in a new process."""
+            process = Process(target=target,
+                              args=self.args, kwargs=self.kwargs)
+            process.start()
+            return process
+
+        def _restart(self, process):
+            """Terminate the process and restart."""
+            process.join(timeout=self.join_timeout)
+            process.terminate()
+            self.restarts_in_frame += 1
+            process = _start_supervised_process()
+
+        try:
+            process = _start_supervised_process()
+            restart_frame = 0
+            while True:
+                if restart_frame > self.max_restart_freq_time:
+                    if self.restarts_in_frame >= self.max_restart_freq:
+                        raise Exception(
+                                "Supervised: Max restart frequency reached")
+                restart_frame = 0
+                self.restarts_in_frame = 0
+
+                try:
+                    proc_is_alive = self.is_alive(process)
+                except TimeoutError:
+                    proc_is_alive = False
+
+                if not proc_is_alive:
+                    self._restart()
+
+                time.sleep(self.check_interval)
+                restart_frame += self.check_interval
+        finally:
+            process.join()
+
+    def _is_alive(self, process):
+        """Sends a ping to the target process to see if it's alive.
+
+        :rtype bool:
+
+        """
+        timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
+        try:
+            alive = process.is_alive()
+        finally:
+            timeout_timer.cancel()
+        return alive

+ 1 - 1
celery/task.py

@@ -12,7 +12,7 @@ from celery.registry import tasks
 from datetime import timedelta
 from celery.backends import default_backend
 from celery.result import AsyncResult, TaskSetResult
-from django.utils.functional import curry
+from functools import partial as curry
 import uuid
 import pickle
 

+ 1 - 1
celery/worker.py

@@ -66,7 +66,7 @@ def jail(task_id, task_name, func, args, kwargs):
     ignore_result = getattr(func, "ignore_result", False)
     timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
 
-     Backend process cleanup
+    # Backend process cleanup
     default_backend.process_cleanup()
 
     try: