pool.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. """
  2. Process Pools.
  3. """
  4. from billiard.pool import DynamicPool
  5. from billiard.utils.functional import curry
  6. from celery import log
  7. from celery.datastructures import ExceptionInfo
  8. class TaskPool(object):
  9. """Process Pool for processing tasks in parallel.
  10. :param limit: see :attr:`limit`.
  11. :param logger: see :attr:`logger`.
  12. .. attribute:: limit
  13. The number of processes that can run simultaneously.
  14. .. attribute:: logger
  15. The logger used for debugging.
  16. """
  17. def __init__(self, limit, logger=None, initializer=None):
  18. self.limit = limit
  19. self.logger = logger or log.get_default_logger()
  20. self.initializer = initializer
  21. self._pool = None
  22. def start(self):
  23. """Run the task pool.
  24. Will pre-fork all workers so they're ready to accept tasks.
  25. """
  26. self._pool = DynamicPool(processes=self.limit,
  27. initializer=self.initializer)
  28. def stop(self):
  29. """Terminate the pool."""
  30. self._pool.close()
  31. self._pool.join()
  32. self._pool = None
  33. def replace_dead_workers(self):
  34. self.logger.debug("TaskPool: Finding dead pool processes...")
  35. dead_count = self._pool.replace_dead_workers()
  36. if dead_count: # pragma: no cover
  37. self.logger.info(
  38. "TaskPool: Replaced %d dead pool workers..." % (
  39. dead_count))
  40. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  41. errbacks=None, accept_callback=None, **compat):
  42. """Equivalent of the :func:``apply`` built-in function.
  43. All ``callbacks`` and ``errbacks`` should complete immediately since
  44. otherwise the thread which handles the result will get blocked.
  45. """
  46. args = args or []
  47. kwargs = kwargs or {}
  48. callbacks = callbacks or []
  49. errbacks = errbacks or []
  50. on_ready = curry(self.on_ready, callbacks, errbacks)
  51. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  52. target, args, kwargs))
  53. self.replace_dead_workers()
  54. return self._pool.apply_async(target, args, kwargs,
  55. callback=on_ready,
  56. accept_callback=accept_callback)
  57. def on_ready(self, callbacks, errbacks, ret_value):
  58. """What to do when a worker task is ready and its return value has
  59. been collected."""
  60. if isinstance(ret_value, ExceptionInfo):
  61. if isinstance(ret_value.exception, (
  62. SystemExit, KeyboardInterrupt)): # pragma: no cover
  63. raise ret_value.exception
  64. [errback(ret_value) for errback in errbacks]
  65. else:
  66. [callback(ret_value) for callback in callbacks]