Browse Source

Don't want an eager argument to apply_async after all

Ask Solem 16 years ago
parent
commit
b8125af692
2 changed files with 45 additions and 8 deletions
  1. 0 4
      celery/execute.py
  2. 45 4
      celery/pool.py

+ 0 - 4
celery/execute.py

@@ -50,10 +50,6 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
 
 
     :keyword priority: The task priority, a number between ``0`` and ``9``.
     :keyword priority: The task priority, a number between ``0`` and ``9``.
 
 
-    :keyword eager: Don't actually send the task to the worker servers,
-        but execute them locally at once. This will block until the execution
-        is finished and return an :class:`celery.result.EagerResult` instance.
-
     """
     """
     args = args or []
     args = args or []
     kwargs = kwargs or {}
     kwargs = kwargs or {}

+ 45 - 4
celery/pool.py

@@ -6,13 +6,54 @@ Process Pools.
 import multiprocessing
 import multiprocessing
 import itertools
 import itertools
 import threading
 import threading
-import uuid
 
 
+from multiprocessing.pool import Pool, worker
 from multiprocessing.pool import RUN as POOL_STATE_RUN
 from multiprocessing.pool import RUN as POOL_STATE_RUN
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
+from celery.utils import gen_unique_id
 from functools import partial as curry
 from functools import partial as curry
 
 
 
 
+class DynamicPool(Pool):
+    """Version of :class:`multiprocessing.Pool` that can dynamically grow
+    in size."""
+    
+    def __init__(self, processes=None, initializer=None, initargs=()):
+        super(DynamicPool, self).__init__(processes=processes,
+                                          initializer=initializer,
+                                          initargs=initargs)
+        self._initializer = initializer
+        self._initargs = initargs
+
+    def add_worker(self):
+        """Add another worker to the pool."""
+        w = self.Process(target=worker,
+                         args=(self._inqueue, self._outqueue,
+                               self._initializer, self._initargs))
+        self._pool.append(w)
+        w.name = w.name.replace("Process", "PoolWorker")
+        w.daemon = True
+        w.start()
+
+    def grow(self, size=1):
+        """Add ``increment`` new workers to the pool."""
+        [self._add_worker() for i in xrange(size)]
+
+    def get_worker_pids(self):
+        """Returns the process id's of all the pool workers."""
+        return [process.pid for process in self._pool]
+
+    def reap_dead_workers(self):
+        dead = [process for process in self._pool
+                            if not process.is_alive()]
+        self._pool = [process for process in self._pool
+                            if process not in dead]
+        return dead
+
+    def replace_dead_workers(self):
+        self.grow(len(self.find_dead_workers()))
+
+
 class TaskPool(object):
 class TaskPool(object):
     """Pool of running child processes, which starts waiting for the
     """Pool of running child processes, which starts waiting for the
     processes to finish when the queue limit has been reached.
     processes to finish when the queue limit has been reached.
@@ -47,7 +88,7 @@ class TaskPool(object):
 
 
         """
         """
         self._processes = {}
         self._processes = {}
-        self._pool = multiprocessing.Pool(processes=self.limit)
+        self._pool = DynamicPool(processes=self.limit)
 
 
     def stop(self):
     def stop(self):
         """Terminate the pool."""
         """Terminate the pool."""
@@ -68,7 +109,7 @@ class TaskPool(object):
         callbacks = callbacks or []
         callbacks = callbacks or []
         errbacks = errbacks or []
         errbacks = errbacks or []
         meta = meta or {}
         meta = meta or {}
-        tid = str(uuid.uuid4())
+        tid = gen_unique_id()
 
 
         self._processed_total = self._process_counter.next()
         self._processed_total = self._process_counter.next()
 
 
@@ -132,7 +173,7 @@ class TaskPool(object):
 
 
     def get_worker_pids(self):
     def get_worker_pids(self):
         """Returns the process id's of all the pool workers."""
         """Returns the process id's of all the pool workers."""
-        return [process.pid for process in self._pool._pool]
+        return self._pool.get_worker_pids()
 
 
     def on_ready(self, callbacks, errbacks, meta, ret_value):
     def on_ready(self, callbacks, errbacks, meta, ret_value):
         """What to do when a worker task is ready and its return value has
         """What to do when a worker task is ready and its return value has