Browse Source

Let multiprocessing.Pool deal with reaping of processes.

Ask Solem 15 years ago
parent
commit
45e33fbf32
1 changed files with 2 additions and 37 deletions
  1. 2 37
      celery/pool.py

+ 2 - 37
celery/pool.py

@@ -15,8 +15,7 @@ from functools import partial as curry
 
 
 class TaskPool(object):
-    """Pool of running child processes, which starts waiting for the
-    processes to finish when the queue limit has been reached.
+    """Process Pool for processing tasks in parallel.
 
     :param limit: see :attr:`limit` attribute.
     :param logger: see :attr:`logger` attribute.
@@ -24,8 +23,7 @@ class TaskPool(object):
 
     .. attribute:: limit
 
-        The number of processes that can run simultaneously until
-        we start collecting results.
+        The number of processes that can run simultaneously.
 
     .. attribute:: logger
 
@@ -36,8 +34,6 @@ class TaskPool(object):
     def __init__(self, limit, logger=None):
         self.limit = limit
         self.logger = logger or multiprocessing.get_logger()
-        self._process_counter = itertools.count(1)
-        self._processed_total = 0
         self._pool = None
         self._processes = None
 
@@ -71,13 +67,8 @@ class TaskPool(object):
         meta = meta or {}
         tid = gen_unique_id()
 
-        self._processed_total = self._process_counter.next()
-
         on_return = curry(self.on_return, tid, callbacks, errbacks, meta)
 
-        if self.full():
-            self.wait_for_result()
-
         result = self._pool.apply_async(target, args, kwargs,
                                         callback=on_return)
         if on_acknowledge:
@@ -105,32 +96,6 @@ class TaskPool(object):
         """
         return len(self._processes.values()) >= self.limit
 
-    def wait_for_result(self):
-        """Waits for the first process in the pool to finish.
-
-        This operation is blocking.
-
-        """
-        while True:
-            if self.reap():
-                break
-
-    def reap(self):
-        """Reap finished tasks."""
-        self.logger.debug("Reaping processes...")
-        processes_reaped = 0
-        for process_no, entry in enumerate(self._processes.items()):
-            tid, process_info = entry
-            result, callbacks, errbacks, meta = process_info
-            try:
-                ret_value = result.get(timeout=0.3)
-            except multiprocessing.TimeoutError:
-                continue
-            else:
-                self.on_return(tid, callbacks, errbacks, meta, ret_value)
-                processes_reaped += 1
-        return processes_reaped
-
     def get_worker_pids(self):
         """Returns the process id's of all the pool workers."""
         return [process.pid for process in self._pool._pool]