threads.py 856 B

123456789101112131415161718192021222324
  1. from threadpool import ThreadPool, WorkRequest
  2. from celery.concurrency.base import apply_target, BasePool
  3. class TaskPool(BasePool):
  4. def on_start(self):
  5. self._pool = ThreadPool(self.limit)
  6. def on_stop(self):
  7. self._pool.dismissWorkers(self.limit, do_join=True)
  8. def on_apply(self, target, args=None, kwargs=None, callback=None,
  9. accept_callback=None, **_):
  10. req = WorkRequest(apply_target, (target, args, kwargs, callback,
  11. accept_callback))
  12. self._pool.putRequest(req)
  13. # threadpool also has callback support,
  14. # but for some reason the callback is not triggered
  15. # before you've collected the results.
  16. # Clear the results (if any), so it doesn't grow too large.
  17. self._pool._results_queue.queue.clear()
  18. return req