Browse Source

Refactored .concurrency to use a common base. Also improved eventlet pool

Ask Solem 14 years ago
parent
commit
73b117f3bc

+ 117 - 0
celery/concurrency/base.py

@@ -0,0 +1,117 @@
+import sys
+import traceback
+
+from celery import log
+from celery.datastructures import ExceptionInfo
+from celery.utils.functional import partial
+
+
+def apply_target(target, args=(), kwargs={}, callback=None,
+        accept_callback=None):
+    if accept_callback:
+        accept_callback()
+    callback(target(*args, **kwargs))
+
+
+class BasePool(object):
+    RUN = 0x1
+    CLOSE = 0x2
+    TERMINATE = 0x3
+
+    _state = None
+    _pool = None
+
+    def __init__(self, limit=None, putlocks=True, logger=None, **options):
+        self.limit = limit
+        self.putlocks = putlocks
+        self.logger = logger or log.get_default_logger()
+        self.options = options
+
+    def on_start(self):
+        pass
+
+    def on_stop(self):
+        pass
+
+    def on_apply(self, *args, **kwargs):
+        pass
+
+    def stop(self):
+        self._state = self.CLOSE
+        self.on_stop()
+        self._state = self.TERMINATE
+
+    def terminate(self):
+        self._state = self.TERMINATE
+        self.on_terminate()
+
+    def start(self):
+        self.on_start()
+        self._state = self.RUN
+
+    def apply_async(self, target, args=None, kwargs=None, callbacks=None,
+            errbacks=None, accept_callback=None, timeout_callback=None,
+            **compat):
+        """Equivalent of the :func:`apply` built-in function.
+
+        All `callbacks` and `errbacks` should complete immediately since
+        otherwise the thread which handles the result will get blocked.
+
+        """
+        args = args or []
+        kwargs = kwargs or {}
+        callbacks = callbacks or []
+        errbacks = errbacks or []
+
+        on_ready = partial(self.on_ready, callbacks, errbacks)
+        on_worker_error = partial(self.on_worker_error, errbacks)
+
+        self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
+            target, args, kwargs))
+
+        return self.on_apply(target, args, kwargs,
+                             callback=on_ready,
+                             accept_callback=accept_callback,
+                             timeout_callback=timeout_callback,
+                             error_callback=on_worker_error,
+                             waitforslot=self.putlocks)
+
+
+    def on_ready(self, callbacks, errbacks, ret_value):
+        """What to do when a worker task is ready and its return value has
+        been collected."""
+
+        if isinstance(ret_value, ExceptionInfo):
+            if isinstance(ret_value.exception, (
+                    SystemExit, KeyboardInterrupt)):
+                raise ret_value.exception
+            [self.safe_apply_callback(errback, ret_value)
+                    for errback in errbacks]
+        else:
+            [self.safe_apply_callback(callback, ret_value)
+                    for callback in callbacks]
+
+    def on_worker_error(self, errbacks, exc):
+        einfo = ExceptionInfo((exc.__class__, exc, None))
+        [errback(einfo) for errback in errbacks]
+
+    def safe_apply_callback(self, fun, *args):
+        try:
+            fun(*args)
+        except:
+            self.logger.error("Pool callback raised exception: %s" % (
+                traceback.format_exc(), ),
+                exc_info=sys.exc_info())
+
+    def _get_info(self):
+        return {}
+
+    @property
+    def info(self):
+        return self._get_info()
+
+    @property
+    def active(self):
+        return self._state == self.RUN
+
+

+ 35 - 111
celery/concurrency/evlet.py

@@ -1,120 +1,44 @@
-
-import threading
-
 from eventlet import GreenPile, GreenPool
 from eventlet import hubs
-from eventlet import spawn_n
-from eventlet.greenthread import sleep
-from Queue import Empty, Queue as LightQueue
-
-from celery import log
-from celery.utils.functional import partial
-from celery.datastructures import ExceptionInfo
-
-RUN = 0x1
-CLOSE = 0x2
-TERMINATE = 0x3
-
-
-accept_lock = threading.Lock()
-
+from eventlet import spawn
+from eventlet.queue import LightQueue
 
-def do_work(target, args=(), kwargs={}, callback=None,
-        accept_callback=None, method_queue=None):
-    method_queue.delegate(accept_callback)
-    callback(target(*args, **kwargs))
+from celery.concurrency.base import apply_target, BasePool
 
 
-class Waiter(threading.Thread):
+class TaskPool(BasePool):
 
-    def __init__(self, inqueue, limit):
-        self.inqueue = inqueue
-        self.limit = limit
-        self._state = None
-        threading.Thread.__init__(self)
+    def _forever_wait_for_pile(self):
+        avail = self._avail
+        pile = self._pile
 
-    def run(self):
-        hubs.use_hub()
-        pool = GreenPool(self.limit)
-        pile = GreenPile(pool)
-        self._state = RUN
-        inqueue = self.inqueue
-
-        def get_from_queue_forever():
-            while self._state == RUN:
-                try:
-                    print("+INQUEUE GET")
-                    m = inqueue.get_nowait()
-                    print("-INQUEUE GET")
-                except Empty:
-                    sleep(0.3)
-                else:
-                    print("+SPAWN")
-                    pile.spawn(*m)
-                    print("-SPAWN")
-
-        def wait_for_pile_forever():
-            while self._state == RUN:
-                print("+CALLING PILE NEXT")
+        while self.active:
+            try:
+                avail.queue.clear()
                 pile.next()
-                print("-CALLING PILE NEXT")
-
-        spawn_n(get_from_queue_forever)
-        spawn_n(wait_for_pile_forever)
-
-        while self._state == RUN:
-            sleep(0.1)
-
+            except StopIteration:
+                avail.get(block=True)  # wait for task
 
-
-class TaskPool(object):
-    _state = None
-
-    def __init__(self, limit, logger=None, **kwargs):
-        self.limit = limit
-        self.logger = logger or log.get_default_logger()
-        self._pool = None
-
-    def start(self):
-        self._state = RUN
-        self._out = LightQueue()
-        self._waiter = Waiter(self._out, self.limit)
-        self._waiter.start()
-
-    def stop(self):
-        self._state = CLOSE
-        self._pool.pool.waitall()
-        self._waiter._state = TERMINATE
-        self._state = TERMINATE
-
-    def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, accept_callback=None, **compat):
-        if self._state != RUN:
-            return
-        args = args or []
-        kwargs = kwargs or {}
-        callbacks = callbacks or []
-        errbacks = errbacks or []
-
-        on_ready = partial(self.on_ready, callbacks, errbacks)
-
-        self.logger.debug("GreenPile: Spawn %s (args:%s kwargs:%s)" % (
-            target, args, kwargs))
-
-        print("+OUTQUEUE.PUT")
-        self._out.put((do_work, target, args, kwargs,
-                      on_ready, accept_callback, self.method_queue))
-        print("-OUTQUEUE.PUT")
-
-
-    def on_ready(self, callbacks, errbacks, ret_value):
-        """What to do when a worker task is ready and its return value has
-        been collected."""
-
-        if isinstance(ret_value, ExceptionInfo):
-            if isinstance(ret_value.exception, (
-                    SystemExit, KeyboardInterrupt)): # pragma: no cover
-                raise ret_value.exception
-            [errback(ret_value) for errback in errbacks]
-        else:
-            [callback(ret_value) for callback in callbacks]
+    def on_start(self):
+        hubs.use_hub()
+        self._avail = LightQueue()
+        self._pool = GreenPool(self.limit)
+        self._pile = GreenPile(self._pool)
+
+        spawn(self._forever_wait_for_pile)
+
+    def on_stop(self):
+        if self._pool is not None:
+            self._pool.waitall()
+
+    def on_apply(self, target, args=None, kwargs=None, callback=None,
+            accept_callback=None, **_):
+        self._pile.spawn(apply_target, target, args, kwargs,
+                         callback, accept_callback)
+        self._avail.put(1)  # notify waiters of new tasks.
+
+    @classmethod
+    def on_import(cls):
+        import eventlet
+        eventlet.monkey_patch()
+TaskPool.on_import()

+ 8 - 74
celery/concurrency/processes/__init__.py

@@ -3,17 +3,11 @@
 Process Pools.
 
 """
-import sys
-import traceback
-
-from celery import log
-from celery.datastructures import ExceptionInfo
-from celery.utils.functional import partial
-
+from celery.concurrency.base import BasePool
 from celery.concurrency.processes.pool import Pool, RUN
 
 
-class TaskPool(object):
+class TaskPool(BasePool):
     """Process Pool for processing tasks in parallel.
 
     :param processes: see :attr:`processes`.
@@ -31,95 +25,35 @@ class TaskPool(object):
     """
     Pool = Pool
 
-    def __init__(self, processes=None, putlocks=True, logger=None, **options):
-        self.processes = processes
-        self.putlocks = putlocks
-        self.logger = logger or log.get_default_logger()
-        self.options = options
-        self._pool = None
-
-    def start(self):
+    def on_start(self):
         """Run the task pool.
 
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
-        self._pool = self.Pool(processes=self.processes, **self.options)
+        self._pool = self.Pool(processes=self.limit, **self.options)
+        self.on_apply = self._pool.apply_async
 
-    def stop(self):
+    def on_stop(self):
         """Gracefully stop the pool."""
         if self._pool is not None and self._pool._state == RUN:
             self._pool.close()
             self._pool.join()
             self._pool = None
 
-    def terminate(self):
+    def on_terminate(self):
         """Force terminate the pool."""
         if self._pool is not None:
             self._pool.terminate()
             self._pool = None
 
-    def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, accept_callback=None, timeout_callback=None,
-            **compat):
-        """Equivalent of the :func:`apply` built-in function.
-
-        All `callbacks` and `errbacks` should complete immediately since
-        otherwise the thread which handles the result will get blocked.
-
-        """
-        args = args or []
-        kwargs = kwargs or {}
-        callbacks = callbacks or []
-        errbacks = errbacks or []
-
-        on_ready = partial(self.on_ready, callbacks, errbacks)
-        on_worker_error = partial(self.on_worker_error, errbacks)
-
-        self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
-            target, args, kwargs))
-
-        return self._pool.apply_async(target, args, kwargs,
-                                      callback=on_ready,
-                                      accept_callback=accept_callback,
-                                      timeout_callback=timeout_callback,
-                                      error_callback=on_worker_error,
-                                      waitforslot=self.putlocks)
-
     def grow(self, n=1):
         return self._pool.grow(n)
 
     def shrink(self, n=1):
         return self._pool.shrink(n)
 
-    def on_worker_error(self, errbacks, exc):
-        einfo = ExceptionInfo((exc.__class__, exc, None))
-        [errback(einfo) for errback in errbacks]
-
-    def on_ready(self, callbacks, errbacks, ret_value):
-        """What to do when a worker task is ready and its return value has
-        been collected."""
-
-        if isinstance(ret_value, ExceptionInfo):
-            if isinstance(ret_value.exception, (
-                    SystemExit, KeyboardInterrupt)):
-                raise ret_value.exception
-            [self.safe_apply_callback(errback, ret_value)
-                    for errback in errbacks]
-        else:
-            [self.safe_apply_callback(callback, ret_value)
-                    for callback in callbacks]
-
-    def safe_apply_callback(self, fun, *args):
-        try:
-            fun(*args)
-        except:
-            self.logger.error("Pool callback raised exception: %s" % (
-                traceback.format_exc(), ),
-                exc_info=sys.exc_info())
-
-    @property
-    def info(self):
+    def _get_info(self):
         return {"max-concurrency": self.processes,
                 "processes": [p.pid for p in self._pool._pool],
                 "max-tasks-per-child": self._pool._maxtasksperchild,

+ 10 - 53
celery/concurrency/threads.py

@@ -1,52 +1,21 @@
-
 import threading
 from threadpool import ThreadPool, WorkRequest
 
-from celery import log
-from celery.utils.functional import partial
-from celery.datastructures import ExceptionInfo
-
-
-accept_lock = threading.Lock()
-
-
-def do_work(target, args=(), kwargs={}, callback=None,
-        accept_callback=None):
-    accept_lock.acquire()
-    try:
-        accept_callback()
-    finally:
-        accept_lock.release()
-    callback(target(*args, **kwargs))
-
+from celery.concurrency.base import apply_target, BasePool
 
-class TaskPool(object):
 
-    def __init__(self, processes, logger=None, **kwargs):
-        self.processes = processes
-        self.logger = logger or log.get_default_logger()
-        self._pool = None
+class TaskPool(BasePool):
 
-    def start(self):
-        self._pool = ThreadPool(self.processes)
+    def on_start(self):
+        self._pool = ThreadPool(self.limit)
 
-    def stop(self):
-        self._pool.dismissWorkers(self.processes, do_join=True)
+    def on_stop(self):
+        self._pool.dismissWorkers(self.limit, do_join=True)
 
-    def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, accept_callback=None, **compat):
-        args = args or []
-        kwargs = kwargs or {}
-        callbacks = callbacks or []
-        errbacks = errbacks or []
-
-        on_ready = partial(self.on_ready, callbacks, errbacks)
-
-        self.logger.debug("ThreadPool: Apply %s (args:%s kwargs:%s)" % (
-            target, args, kwargs))
-
-        req = WorkRequest(do_work, (target, args, kwargs, on_ready,
-                                    accept_callback))
+    def on_apply(self, target, args=None, kwargs=None, callback=None,
+            accept_callback=None, **_):
+        req = WorkRequest(apply_target, (target, args, kwargs, callback,
+                                         accept_callback))
         self._pool.putRequest(req)
         # threadpool also has callback support,
         # but for some reason the callback is not triggered
@@ -54,15 +23,3 @@ class TaskPool(object):
         # Clear the results (if any), so it doesn't grow too large.
         self._pool._results_queue.queue.clear()
         return req
-
-    def on_ready(self, callbacks, errbacks, ret_value):
-        """What to do when a worker task is ready and its return value has
-        been collected."""
-
-        if isinstance(ret_value, ExceptionInfo):
-            if isinstance(ret_value.exception, (
-                    SystemExit, KeyboardInterrupt)):    # pragma: no cover
-                raise ret_value.exception
-            [errback(ret_value) for errback in errbacks]
-        else:
-            [callback(ret_value) for callback in callbacks]