浏览代码

Cleanup pool

Ask Solem 15 年之前
父节点
当前提交
80c24afffe
共有 2 个文件被更改,包括 11 次插入88 次删除
  1. 11 82
      celery/pool.py
  2. 0 6
      celery/tests/test_pool.py

+ 11 - 82
celery/pool.py

@@ -11,9 +11,6 @@ from celery.datastructures import ExceptionInfo
 from celery.utils import gen_unique_id
 from functools import partial as curry
 
-MAX_RESTART_FREQ = 10
-MAX_RESTART_FREQ_TIME = 60
-
 
 class DynamicPool(Pool):
     """Version of :class:`multiprocessing.Pool` that can dynamically grow
@@ -40,10 +37,6 @@ class DynamicPool(Pool):
         """Add ``size`` new workers to the pool."""
         map(self._add_worker, range(size))
 
-    def get_worker_pids(self):
-        """Returns the process id's of all the pool workers."""
-        return [process.pid for process in self.processes]
-
     def is_alive(self, process):
         try:
             proc_is_alive = process.is_alive()
@@ -53,7 +46,7 @@ class DynamicPool(Pool):
             return proc_is_alive
 
     def replace_dead_workers(self):
-        dead = [process for process in self.processes
+        dead = [process for process in self._pool
                             if not self.is_alive(process)]
         if dead:
             dead_pids = [process.pid for process in dead]
@@ -63,73 +56,6 @@ class DynamicPool(Pool):
         
         return dead
 
-    @property
-    def processes(self):
-        return self._pool
-
-
-class PoolSupervisor(object):
-    """Supervisor implementing the "one_for_one" strategy.
-
-    :param target: See :attr:`target`.
-    :param max_restart_freq: See :attr:`max_restart_freq`.
-    :param max_restart_freq_time: See :attr:`max_restart_freq_time`.
-
-    .. attribute:: target
-
-        The target pool to supervise.
-
-    .. attribute:: max_restart_freq
-
-        Limit the number of restarts which can occur in a given time interval.
-
-        The max restart frequency is the number of restarts that can occur
-        within the interval :attr:`max_restart_freq_time`.
-
-        The restart mechanism prevents situations where the process repeatedly
-        dies for the same reason. If this happens both the process and the
-        supervisor is terminated.
-
-    .. attribute:: max_restart_freq_time
-
-        See :attr:`max_restart_freq`.
-
-    """
-
-    def __init__(self, target, logger=None,
-            max_restart_freq=MAX_RESTART_FREQ,
-            max_restart_freq_time=MAX_RESTART_FREQ_TIME):
-        self.logger = logger or multiprocessing.get_logger()
-        self.target = target
-        self.max_restart_freq = max_restart_freq * len(target.processes)
-        self.max_restart_freq_time = max_restart_freq_time
-        self.restart_frame_time = None
-        self.restarts_in_frame = 0
-
-    def restart_freq_exceeded(self):
-        if not self.restart_frame_time:
-            self.restart_frame_time = time.time()
-            return False
-        time_exceeded = time.time() > self.max_restart_frame_time + \
-                self.max_restart_freq_time
-        if time_exceeded:
-            if self.restarts_in_frame >= self.max_restart_freq:
-                return True
-            self.restart_frame_time = time.time()
-        return False
-          
-    def supervise(self):
-        self.logger.debug("PoolSupervisor: Finding dead worker processes...")
-        dead = self.target.replace_dead_workers()
-        if dead:
-            self.logger.info(
-                "PoolSupervisor: Replaced %s dead pool workers..." % (
-                    len(dead)))
-            self.restarts_in_frame += len(dead)
-            if self.restart_freq_exceeded():
-                raise MaxRestartsExceededError(
-                    "Pool supervisor: Max restart frequencey exceeded.")
-                    
 
 class TaskPool(object):
     """Process Pool for processing tasks in parallel.
@@ -152,7 +78,6 @@ class TaskPool(object):
         self.limit = limit
         self.logger = logger or multiprocessing.get_logger()
         self._pool = None
-        self._supervisor = None
 
     def start(self):
         """Run the task pool.
@@ -161,13 +86,20 @@ class TaskPool(object):
 
         """
         self._pool = DynamicPool(processes=self.limit)
-        self._supervisor = PoolSupervisor(self._pool)
 
     def stop(self):
         """Terminate the pool."""
         self._pool.terminate()
         self._pool = None
 
+    def replace_dead_workers(self):
+        self.logger.debug("TaskPool: Finding dead pool processes...")
+        dead = self._pool.replace_dead_workers()
+        if dead:
+            self.logger.info(
+                "TaskPool: Replaced %s dead pool workers..." % (
+                    len(dead)))
+
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, on_ack=None, meta=None):
         """Equivalent of the :func:``apply`` built-in function.
@@ -185,11 +117,12 @@ class TaskPool(object):
         on_return = curry(self.on_return, callbacks, errbacks,
                           on_ack, meta)
 
-        self._supervisor.supervise()
 
         self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
             target, args, kwargs))
 
+        self.replace_dead_workers()
+
         return self._pool.apply_async(target, args, kwargs,
                                         callback=on_return)
 
@@ -203,10 +136,6 @@ class TaskPool(object):
 
         self.on_ready(callbacks, errbacks, meta, ret_value)
 
-    def get_worker_pids(self):
-        """Returns the process id's of all the pool workers."""
-        return [process.pid for process in self._pool._pool]
-
     def on_ready(self, callbacks, errbacks, meta, ret_value):
         """What to do when a worker task is ready and its return value has
         been collected."""

+ 0 - 6
celery/tests/test_pool.py

@@ -91,9 +91,3 @@ class TestTaskPool(unittest.TestCase):
         self.assertEquals(scratchpad[3]["meta"], {"foo4": "bar4"})
 
         p.stop()
-
-    def test_get_worker_pids(self):
-        p = TaskPool(5)
-        p.start()
-        self.assertEquals(len(p.get_worker_pids()), 5)
-        p.stop()