|
@@ -1,20 +1,28 @@
|
|
|
-from threadpool import ThreadPool, WorkRequest
|
|
|
-
|
|
|
from celery.concurrency.base import apply_target, BasePool
|
|
|
|
|
|
|
|
|
class TaskPool(BasePool):
|
|
|
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ try:
|
|
|
+ import threadpool
|
|
|
+ except ImportError:
|
|
|
+ raise ImportError(
|
|
|
+ "The threaded pool requires the threadpool module.")
|
|
|
+ self.WorkRequest = threadpool.WorkRequest
|
|
|
+ self.ThreadPool = threadpool.ThreadPool
|
|
|
+ super(TaskPool, self).__init__(*args, **kwargs)
|
|
|
+
|
|
|
def on_start(self):
|
|
|
- self._pool = ThreadPool(self.limit)
|
|
|
+ self._pool = self.ThreadPool(self.limit)
|
|
|
|
|
|
def on_stop(self):
|
|
|
self._pool.dismissWorkers(self.limit, do_join=True)
|
|
|
|
|
|
def on_apply(self, target, args=None, kwargs=None, callback=None,
|
|
|
accept_callback=None, **_):
|
|
|
- req = WorkRequest(apply_target, (target, args, kwargs, callback,
|
|
|
- accept_callback))
|
|
|
+ req = self.WorkRequest(apply_target, (target, args, kwargs, callback,
|
|
|
+ accept_callback))
|
|
|
self._pool.putRequest(req)
|
|
|
# threadpool also has callback support,
|
|
|
# but for some reason the callback is not triggered
|