__init__.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. """
  2. Process Pools.
  3. """
  4. from celery import log
  5. from celery.datastructures import ExceptionInfo
  6. from celery.utils.functional import curry
  7. from celery.concurrency.processes.pool import Pool, RUN
  8. class TaskPool(object):
  9. """Process Pool for processing tasks in parallel.
  10. :param limit: see :attr:`limit`.
  11. :param logger: see :attr:`logger`.
  12. .. attribute:: limit
  13. The number of processes that can run simultaneously.
  14. .. attribute:: logger
  15. The logger used for debugging.
  16. """
  17. Pool = Pool
  18. def __init__(self, limit, logger=None, initializer=None,
  19. maxtasksperchild=None, timeout=None, soft_timeout=None,
  20. putlocks=True):
  21. self.limit = limit
  22. self.logger = logger or log.get_default_logger()
  23. self.initializer = initializer
  24. self.maxtasksperchild = maxtasksperchild
  25. self.timeout = timeout
  26. self.soft_timeout = soft_timeout
  27. self.putlocks = putlocks
  28. self._pool = None
  29. def start(self):
  30. """Run the task pool.
  31. Will pre-fork all workers so they're ready to accept tasks.
  32. """
  33. self._pool = self.Pool(processes=self.limit,
  34. initializer=self.initializer,
  35. timeout=self.timeout,
  36. soft_timeout=self.soft_timeout,
  37. maxtasksperchild=self.maxtasksperchild)
  38. def stop(self):
  39. """Gracefully stop the pool."""
  40. if self._pool is not None and self._pool._state == RUN:
  41. self._pool.close()
  42. self._pool.join()
  43. self._pool = None
  44. def terminate(self):
  45. """Force terminate the pool."""
  46. if self._pool is not None:
  47. self._pool.terminate()
  48. self._pool = None
  49. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  50. errbacks=None, accept_callback=None, timeout_callback=None,
  51. **compat):
  52. """Equivalent of the :func:``apply`` built-in function.
  53. All ``callbacks`` and ``errbacks`` should complete immediately since
  54. otherwise the thread which handles the result will get blocked.
  55. """
  56. args = args or []
  57. kwargs = kwargs or {}
  58. callbacks = callbacks or []
  59. errbacks = errbacks or []
  60. on_ready = curry(self.on_ready, callbacks, errbacks)
  61. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  62. target, args, kwargs))
  63. return self._pool.apply_async(target, args, kwargs,
  64. callback=on_ready,
  65. accept_callback=accept_callback,
  66. timeout_callback=timeout_callback,
  67. waitforslot=self.putlocks)
  68. def on_ready(self, callbacks, errbacks, ret_value):
  69. """What to do when a worker task is ready and its return value has
  70. been collected."""
  71. if isinstance(ret_value, ExceptionInfo):
  72. if isinstance(ret_value.exception, (
  73. SystemExit, KeyboardInterrupt)):
  74. raise ret_value.exception
  75. [errback(ret_value) for errback in errbacks]
  76. else:
  77. [callback(ret_value) for callback in callbacks]
  78. @property
  79. def info(self):
  80. return {"max-concurrency": self.limit,
  81. "processes": [p.pid for p in self._pool._pool],
  82. "max-tasks-per-child": self.maxtasksperchild,
  83. "put-guarded-by-semaphore": self.putlocks,
  84. "timeouts": (self.soft_timeout, self.timeout)}