Browse Source

Implements PoolSupervisor

Ask Solem 15 years ago
parent
commit
980e8717e4
4 changed files with 119 additions and 44 deletions
  1. 5 0
      celery/managers.py
  2. 113 27
      celery/pool.py
  3. 1 6
      celery/supervisor.py
  4. 0 11
      celery/tests/test_pool.py

+ 5 - 0
celery/managers.py

@@ -3,6 +3,7 @@ from django.db import models
 from django.db import connection
 from celery.registry import tasks
 from datetime import datetime, timedelta
+from django.conf import settings
 import random
 
 # server_drift can be negative, but timedelta supports addition on
@@ -58,6 +59,8 @@ class PeriodicTaskManager(models.Manager):
 
     def lock(self):
         """Lock the periodic task table for reading."""
+        if settings.DATABASE_ENGINE != "mysql":
+            return
         cursor = connection.cursor()
         table = self.model._meta.db_table
         cursor.execute("LOCK TABLES %s READ" % table)
@@ -66,6 +69,8 @@ class PeriodicTaskManager(models.Manager):
 
     def unlock(self):
         """Unlock the periodic task table."""
+        if settings.DATABASE_ENGINE != "mysql":
+            return
         cursor = connection.cursor()
         table = self.model._meta.db_table
         cursor.execute("UNLOCK TABLES")

+ 113 - 27
celery/pool.py

@@ -5,11 +5,113 @@ Process Pools.
 """
 import multiprocessing
 
-from multiprocessing.pool import Pool
+from multiprocessing.pool import Pool, worker
 from celery.datastructures import ExceptionInfo
 from celery.utils import gen_unique_id
 from functools import partial as curry
 
+MAX_RESTART_FREQ = 10
+MAX_RESTART_FREQ_TIME = 60
+
+
+class DynamicPool(Pool):
+    """Version of :class:`multiprocessing.Pool` that can dynamically grow
+    in size."""
+
+    def __init__(self, processes=None, initializer=None, initargs=()):
+        super(DynamicPool, self).__init__(processes=processes,
+                                          initializer=initializer,
+                                          initargs=initargs)
+        self._initializer = initializer
+        self._initargs = initargs
+
+    def add_worker(self):
+        """Add another worker to the pool."""
+        w = self.Process(target=worker,
+                         args=(self._inqueue, self._outqueue,
+                               self._initializer, self._initargs))
+        self._pool.append(w)
+        w.name = w.name.replace("Process", "PoolWorker")
+        w.daemon = True
+        w.start()
+
+    def grow(self, size=1):
+        """Add ``size`` new workers to the pool."""
+        map(self._add_worker, range(size))
+
+    def get_worker_pids(self):
+        """Returns the process id's of all the pool workers."""
+        return [process.pid for process in self.processes]
+
+    def replace_dead_workers(self):
+        dead = [process for process in self.processes
+                            if not process.is_alive()]
+        if dead:
+            dead_pids = [process.pid for process in dead]
+            self._pool = [process for process in self._pool
+                            if process.pid not in dead_pids]
+            self.grow(len(dead))
+        
+        return dead
+
+    @property
+    def processes(self):
+        return self._pool
+
+
+class PoolSupervisor(object):
+    """Supervisor implementing the "one_for_one" strategy.
+
+    :param target: See :attr:`target`.
+    :param max_restart_freq: See :attr:`max_restart_freq`.
+    :param max_restart_freq_time: See :attr:`max_restart_freq_time`.
+
+    .. attribute:: target
+
+        The target pool to supervise.
+
+    .. attribute:: max_restart_freq
+
+        Limit the number of restarts which can occur in a given time interval.
+
+        The max restart frequency is the number of restarts that can occur
+        within the interval :attr:`max_restart_freq_time`.
+
+        The restart mechanism prevents situations where the process repeatedly
+        dies for the same reason. If this happens both the process and the
+        supervisor is terminated.
+
+    .. attribute:: max_restart_freq_time
+
+        See :attr:`max_restart_freq`.
+
+    """
+
+    def __init__(self, target, max_restart_freq=MAX_RESTART_FREQ,
+                               max_restart_freq_time=MAX_RESTART_FREQ_TIME):
+        self.target = target
+        self.max_restart_freq = max_restart_freq * len(target.processes)
+        self.max_restart_freq_time = max_restart_freq_time
+        self.restart_frame_time = None
+        self.restarts_in_frame = 0
+
+    def restart_freq_exceeded(self):
+        if not self.restart_frame_time:
+            self.restart_frame_time = time.time()
+            return False
+        time_exceeded = time.time() > self.max_restart_frame_time + \
+                self.max_restart_freq_time
+        if time_exceeded and self.restarts_in_frame >= self.max_restart_freq:
+            return True
+          
+    def supervise(self):
+        dead = self.target.replace_dead_workers()
+        if dead:
+            self.restarts_in_frame += len(dead)
+            if self.restart_freq_exceeded():
+                raise MaxRestartsExceededError(
+                    "Pool supervisor: Max restart frequencey exceeded.")
+                    
 
 class TaskPool(object):
     """Process Pool for processing tasks in parallel.
@@ -32,7 +134,7 @@ class TaskPool(object):
         self.limit = limit
         self.logger = logger or multiprocessing.get_logger()
         self._pool = None
-        self._processes = None
+        self._supervisor = None
 
     def start(self):
         """Run the task pool.
@@ -40,13 +142,12 @@ class TaskPool(object):
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
-        self._processes = {}
-        self._pool = Pool(processes=self.limit)
+        self._pool = DynamicPool(processes=self.limit)
+        self._supervisor = PoolSupervisor(self._pool)
 
     def stop(self):
         """Terminate the pool."""
         self._pool.terminate()
-        self._processes = {}
         self._pool = None
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
@@ -62,42 +163,27 @@ class TaskPool(object):
         callbacks = callbacks or []
         errbacks = errbacks or []
         meta = meta or {}
-        tid = gen_unique_id()
 
-        on_return = curry(self.on_return, tid, callbacks, errbacks,
+        on_return = curry(self.on_return, callbacks, errbacks,
                           on_ack, meta)
 
+        self._supervisor.supervise()
+
         self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
             target, args, kwargs))
-        result = self._pool.apply_async(target, args, kwargs,
-                                        callback=on_return)
 
-        self._processes[tid] = [result, callbacks, errbacks, meta]
+        return self._pool.apply_async(target, args, kwargs,
+                                        callback=on_return)
 
-        return result
 
-    def on_return(self, tid, callbacks, errbacks, on_ack, meta, ret_value):
+    def on_return(self, callbacks, errbacks, on_ack, meta, ret_value):
         """What to do when the process returns."""
 
         # Acknowledge the task as being processed.
         if on_ack:
             on_ack()
 
-        try:
-            del(self._processes[tid])
-        except KeyError:
-            pass
-        else:
-            self.on_ready(callbacks, errbacks, meta, ret_value)
-
-    def full(self):
-        """Is the pool full?
-
-        :returns: ``True`` if the maximum number of concurrent processes
-            has been reached.
-
-        """
-        return len(self._processes.values()) >= self.limit
+        self.on_ready(callbacks, errbacks, meta, ret_value)
 
     def get_worker_pids(self):
         """Returns the process id's of all the pool workers."""

+ 1 - 6
celery/supervisor.py

@@ -24,7 +24,6 @@ class OFASupervisor(object):
     :param target: see :attr:`target`.
     :param args: see :attr:`args`.
     :param kwargs: see :attr:`kwargs`.
-    :param join_timeout: see :attr:`join_timeout`.
     :param max_restart_freq: see :attr:`max_restart_freq`.
     :param max_restart_freq_time: see :attr:`max_restart_freq_time`.
     :param check_interval: see :attr:`max_restart_freq_time`.
@@ -41,10 +40,6 @@ class OFASupervisor(object):
 
         The keyword arguments to apply to :attr:`target`.
 
-    .. attribute:: join_timeout
-
-        If the process is dead, try to give it a few seconds to join.
-
     .. attribute:: max_restart_freq
 
         Limit the number of restarts which can occur in a given time interval.
@@ -69,7 +64,7 @@ class OFASupervisor(object):
 
     def __init__(self, target, args=None, kwargs=None,
             join_timeout=JOIN_TIMEOUT,
-            max_restart_freq = MAX_RESTART_FREQ,
+            max_restart_freq=MAX_RESTART_FREQ,
             max_restart_freq_time=MAX_RESTART_FREQ_TIME,
             check_interval=CHECK_INTERVAL):
         self.target = target

+ 0 - 11
celery/tests/test_pool.py

@@ -30,16 +30,13 @@ class TestTaskPool(unittest.TestCase):
         self.assertEquals(p.limit, 2)
         self.assertTrue(isinstance(p.logger, logging.Logger))
         self.assertTrue(p._pool is None)
-        self.assertTrue(p._processes is None)
 
     def x_start_stop(self):
         p = TaskPool(limit=2)
         p.start()
         self.assertTrue(p._pool)
-        self.assertTrue(isinstance(p._processes, dict))
         p.stop()
         self.assertTrue(p._pool is None)
-        self.assertFalse(p._processes)
 
     def x_apply(self):
         p = TaskPool(limit=2)
@@ -95,14 +92,6 @@ class TestTaskPool(unittest.TestCase):
 
         p.stop()
 
-    def test_is_full(self):
-        p = TaskPool(2)
-        p.start()
-        self.assertFalse(p.full())
-        results = [p.apply_async(long_something) for i in xrange(4)]
-        self.assertTrue(p.full())
-        p.stop()
-
     def test_get_worker_pids(self):
         p = TaskPool(5)
         p.start()