threads.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import threading
  2. from threadpool import ThreadPool, WorkRequest
  3. from celery import log
  4. from celery.utils.functional import partial
  5. from celery.datastructures import ExceptionInfo
  6. accept_lock = threading.Lock()
  7. def do_work(target, args=(), kwargs={}, callback=None,
  8. accept_callback=None):
  9. accept_lock.acquire()
  10. try:
  11. accept_callback()
  12. finally:
  13. accept_lock.release()
  14. callback(target(*args, **kwargs))
  15. class TaskPool(object):
  16. def __init__(self, limit, logger=None, **kwargs):
  17. self.limit = limit
  18. self.logger = logger or log.get_default_logger()
  19. self._pool = None
  20. def start(self):
  21. self._pool = ThreadPool(self.limit)
  22. def stop(self):
  23. self._pool.dismissWorkers(self.limit, do_join=True)
  24. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  25. errbacks=None, accept_callback=None, **compat):
  26. args = args or []
  27. kwargs = kwargs or {}
  28. callbacks = callbacks or []
  29. errbacks = errbacks or []
  30. on_ready = partial(self.on_ready, callbacks, errbacks)
  31. self.logger.debug("ThreadPool: Apply %s (args:%s kwargs:%s)" % (
  32. target, args, kwargs))
  33. req = WorkRequest(do_work, (target, args, kwargs, on_ready,
  34. accept_callback))
  35. self._pool.putRequest(req)
  36. # threadpool also has callback support,
  37. # but for some reason the callback is not triggered
  38. # before you've collected the results.
  39. # Clear the results (if any), so it doesn't grow too large.
  40. self._pool._results_queue.queue.clear()
  41. return req
  42. def on_ready(self, callbacks, errbacks, ret_value):
  43. """What to do when a worker task is ready and its return value has
  44. been collected."""
  45. if isinstance(ret_value, ExceptionInfo):
  46. if isinstance(ret_value.exception, (
  47. SystemExit, KeyboardInterrupt)): # pragma: no cover
  48. raise ret_value.exception
  49. [errback(ret_value) for errback in errbacks]
  50. else:
  51. [callback(ret_value) for callback in callbacks]