threads.py 1.2 KB

1234567891011121314151617181920212223242526272829303132
  1. from celery.concurrency.base import apply_target, BasePool
  2. class TaskPool(BasePool):
  3. def __init__(self, *args, **kwargs):
  4. try:
  5. import threadpool
  6. except ImportError:
  7. raise ImportError(
  8. "The threaded pool requires the threadpool module.")
  9. self.WorkRequest = threadpool.WorkRequest
  10. self.ThreadPool = threadpool.ThreadPool
  11. super(TaskPool, self).__init__(*args, **kwargs)
  12. def on_start(self):
  13. self._pool = self.ThreadPool(self.limit)
  14. def on_stop(self):
  15. self._pool.dismissWorkers(self.limit, do_join=True)
  16. def on_apply(self, target, args=None, kwargs=None, callback=None,
  17. accept_callback=None, **_):
  18. req = self.WorkRequest(apply_target, (target, args, kwargs, callback,
  19. accept_callback))
  20. self._pool.putRequest(req)
  21. # threadpool also has callback support,
  22. # but for some reason the callback is not triggered
  23. # before you've collected the results.
  24. # Clear the results (if any), so it doesn't grow too large.
  25. self._pool._results_queue.queue.clear()
  26. return req