소스 검색

Removed the --supervised feature, it was not working well, use supervisord
instead (http://supervisord.org/)

Ask Solem 16 년 전
부모
커밋
2714647ddd
6개의 변경된 파일3개의 추가작업 그리고 218개의 파일을 삭제
  1. 2 13
      celery/bin/celeryd.py
  2. 0 120
      celery/supervisor.py
  3. 0 66
      celery/tests/test_supervisor.py
  4. 0 8
      docs/reference/celery.fields.rst
  5. 0 8
      docs/reference/celery.supervisor.rst
  6. 1 3
      docs/reference/index.rst

+ 2 - 13
celery/bin/celeryd.py

@@ -35,10 +35,6 @@
 
     Run in the background as a daemon.
 
-.. cmdoption:: -S, --supervised
-
-    Restart the worker server if it dies.
-
 .. cmdoption:: --discard
 
     Discard all waiting tasks before the daemon is started.
@@ -83,7 +79,6 @@ from celery.worker import WorkController
 from celery.loaders import current_loader, settings
 from celery.loaders import current_loader
 from celery.loaders import settings
-from celery.supervisor import OFASupervisor
 
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 # Make sure the setting exists.
@@ -128,9 +123,6 @@ OPTION_LIST = (
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="detach",
             help="Run in the background as a daemon."),
-    optparse.make_option('-S', '--supervised', default=False,
-            action="store_true", dest="supervised",
-            help="Restart the worker server if it dies."),
     optparse.make_option('-u', '--uid', default=None,
             action="store", dest="uid",
             help="User-id to run celeryd as when in daemon mode."),
@@ -152,7 +144,7 @@ OPTION_LIST = (
 def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
         loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
         discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
-        uid=None, gid=None, supervised=False, working_directory=None,
+        uid=None, gid=None, working_directory=None,
         chroot=None, statistics=None, run_clockservice=False, **kwargs):
     """Starts the celery worker server."""
 
@@ -245,10 +237,7 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
                             e.__class__, e, traceback.format_exc()))
 
     try:
-        if supervised:
-            OFASupervisor(target=run_worker).start()
-        else:
-            run_worker()
+        run_worker()
     except:
         if detach:
             context.close()

+ 0 - 120
celery/supervisor.py

@@ -1,120 +0,0 @@
-import time
-import multiprocessing
-from multiprocessing import TimeoutError
-
-JOIN_TIMEOUT = 2
-CHECK_INTERVAL = 2
-MAX_RESTART_FREQ = 3
-MAX_RESTART_FREQ_TIME = 10
-
-
-class MaxRestartsExceededError(Exception):
-    """Restarts exceeded the maximum restart frequency."""
-
-
-class OFASupervisor(object):
-    """Process supervisor using the `one_for_all`_ strategy.
-
-    .. _`one_for_all`:
-        http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
-
-    However, instead of registering a list of processes, you have one
-    process which runs a pool. Makes for an easy implementation.
-
-    :param target: see :attr:`target`.
-    :param args: see :attr:`args`.
-    :param kwargs: see :attr:`kwargs`.
-    :param max_restart_freq: see :attr:`max_restart_freq`.
-    :param max_restart_freq_time: see :attr:`max_restart_freq_time`.
-    :param check_interval: see :attr:`max_restart_freq_time`.
-
-    .. attribute:: target
-
-        The target callable to be launched in a new process.
-
-    .. attribute:: args
-
-        The positional arguments to apply to :attr:`target`.
-
-    .. attribute:: kwargs
-
-        The keyword arguments to apply to :attr:`target`.
-
-    .. attribute:: max_restart_freq
-
-        Limit the number of restarts which can occur in a given time interval.
-
-        The max restart frequency is the number of restarts that can occur
-        within the interval :attr:`max_restart_freq_time`.
-
-        The restart mechanism prevents situations where the process repeatedly
-        dies for the same reason. If this happens both the process and the
-        supervisor is terminated.
-
-    .. attribute:: max_restart_freq_time
-
-        See :attr:`max_restart_freq`.
-
-    .. attribute:: check_interval
-
-        The time in seconds, between process pings.
-
-    """
-    Process = multiprocessing.Process
-
-    def __init__(self, target, args=None, kwargs=None,
-            max_restart_freq=MAX_RESTART_FREQ,
-            join_timeout=JOIN_TIMEOUT,
-            max_restart_freq_time=MAX_RESTART_FREQ_TIME,
-            check_interval=CHECK_INTERVAL):
-        self.target = target
-        self.join_timeout = join_timeout
-        self.args = args or []
-        self.kwargs = kwargs or {}
-        self.check_interval = check_interval
-        self.max_restart_freq = max_restart_freq
-        self.max_restart_freq_time = max_restart_freq_time
-        self.restarts_in_frame = 0
-
-    def start(self):
-        """Launches the :attr:`target` in a seperate process and starts
-        supervising it."""
-        target = self.target
-
-        def _start_supervised_process():
-            """Start the :attr:`target` in a new process."""
-            process = self.Process(target=target,
-                                   args=self.args, kwargs=self.kwargs)
-            process.start()
-            return process
-
-        def _restart(process):
-            """Terminate the process and restart."""
-            process.join(timeout=self.join_timeout)
-            process.terminate()
-            self.restarts_in_frame += 1
-            process = _start_supervised_process()
-
-        process = _start_supervised_process()
-        try:
-            restart_frame = 0
-            while True:
-                if restart_frame > self.max_restart_freq_time:
-                    if self.restarts_in_frame >= self.max_restart_freq:
-                        raise MaxRestartsExceededError(
-                                "Supervised: Max restart frequency reached")
-                restart_frame = 0
-                self.restarts_in_frame = 0
-
-                try:
-                    proc_is_alive = process.is_alive()
-                except TimeoutError:
-                    proc_is_alive = False
-
-                if not proc_is_alive:
-                    _restart(process)
-
-                time.sleep(self.check_interval)
-                restart_frame += self.check_interval
-        finally:
-            process.join()

+ 0 - 66
celery/tests/test_supervisor.py

@@ -1,66 +0,0 @@
-import unittest
-from celery.supervisor import OFASupervisor
-from celery.supervisor import TimeoutError, MaxRestartsExceededError
-
-
-def target_one(x, y, z):
-    return x * y * z
-
-
-class MockProcess(object):
-    _started = False
-    _stopped = False
-    _terminated = False
-    _joined = False
-    alive = True
-    timeout_on_is_alive = False
-
-    def __init__(self, target, args, kwargs):
-        self.target = target
-        self.args = args
-        self.kwargs = kwargs
-
-    def start(self):
-        self._stopped = False
-        self._started = True
-
-    def stop(self):
-        self._stopped = True
-        self._started = False
-
-    def terminate(self):
-        self._terminated = False
-
-    def is_alive(self):
-        if self._started and self.alive:
-            if self.timeout_on_is_alive:
-                raise TimeoutError("Supervised: timed out.")
-            return True
-        return False
-
-    def join(self, timeout=None):
-        self._joined = True
-
-
-class TestOFASupervisor(unittest.TestCase):
-
-    def test_init(self):
-        s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={})
-        s.Process = MockProcess
-
-    def test_start(self):
-        MockProcess.alive = False
-        s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},
-                          max_restart_freq=0, max_restart_freq_time=0)
-        s.Process = MockProcess
-        self.assertRaises(MaxRestartsExceededError, s.start)
-        MockProcess.alive = True
-
-    def test_start_is_alive_timeout(self):
-        MockProcess.alive = True
-        MockProcess.timeout_on_is_alive = True
-        s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},
-                          max_restart_freq=0, max_restart_freq_time=0)
-        s.Process = MockProcess
-        self.assertRaises(MaxRestartsExceededError, s.start)
-        MockProcess.timeout_on_is_alive = False

+ 0 - 8
docs/reference/celery.fields.rst

@@ -1,8 +0,0 @@
-===============================
- Django Fields - celery.fields
-===============================
-
-.. currentmodule:: celery.fields
-
-.. automodule:: celery.fields
-    :members:

+ 0 - 8
docs/reference/celery.supervisor.rst

@@ -1,8 +0,0 @@
-========================================
- Process Supervisor - celery.supervisor
-========================================
-
-.. currentmodule:: celery.supervisor
-
-.. automodule:: celery.supervisor
-    :members:

+ 1 - 3
docs/reference/index.rst

@@ -9,7 +9,7 @@
     :maxdepth: 2
 
     celery.task.base
-    celery.execute 
+    celery.execute
     celery.result
     celery.task
     celery.registry
@@ -29,7 +29,6 @@
     celery.worker.job
     celery.worker.controllers
     celery.pool
-    celery.supervisor
     celery.backends
     celery.backends.base
     celery.backends.database
@@ -44,6 +43,5 @@
     celery.views
     celery.managers
     celery.models
-    celery.fields
     celery.bin.celeryd
     celery.bin.celeryinit