__init__.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """
  2. Process Pools.
  3. """
  4. import sys
  5. import traceback
  6. from celery import log
  7. from celery.datastructures import ExceptionInfo
  8. from celery.utils.functional import partial
  9. from celery.concurrency.processes.pool import Pool, RUN
  10. def pingback(i):
  11. return i
  12. class TaskPool(object):
  13. """Process Pool for processing tasks in parallel.
  14. :param limit: see :attr:`limit`.
  15. :param logger: see :attr:`logger`.
  16. .. attribute:: limit
  17. The number of processes that can run simultaneously.
  18. .. attribute:: logger
  19. The logger used for debugging.
  20. """
  21. Pool = Pool
  22. def __init__(self, limit, logger=None, initializer=None,
  23. maxtasksperchild=None, timeout=None, soft_timeout=None,
  24. putlocks=True, initargs=()):
  25. self.limit = limit
  26. self.logger = logger or log.get_default_logger()
  27. self.initializer = initializer
  28. self.initargs = initargs
  29. self.maxtasksperchild = maxtasksperchild
  30. self.timeout = timeout
  31. self.soft_timeout = soft_timeout
  32. self.putlocks = putlocks
  33. self.initargs = initargs
  34. self._pool = None
  35. def start(self):
  36. """Run the task pool.
  37. Will pre-fork all workers so they're ready to accept tasks.
  38. """
  39. self._pool = self.Pool(processes=self.limit,
  40. initializer=self.initializer,
  41. initargs=self.initargs,
  42. timeout=self.timeout,
  43. soft_timeout=self.soft_timeout,
  44. maxtasksperchild=self.maxtasksperchild)
  45. def stop(self):
  46. """Gracefully stop the pool."""
  47. if self._pool is not None and self._pool._state == RUN:
  48. self._pool.close()
  49. self._pool.join()
  50. self._pool = None
  51. def terminate(self):
  52. """Force terminate the pool."""
  53. if self._pool is not None:
  54. self._pool.terminate()
  55. self._pool = None
  56. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  57. errbacks=None, accept_callback=None, timeout_callback=None,
  58. **compat):
  59. """Equivalent of the :func:`apply` built-in function.
  60. All `callbacks` and `errbacks` should complete immediately since
  61. otherwise the thread which handles the result will get blocked.
  62. """
  63. args = args or []
  64. kwargs = kwargs or {}
  65. callbacks = callbacks or []
  66. errbacks = errbacks or []
  67. on_ready = partial(self.on_ready, callbacks, errbacks)
  68. on_worker_error = partial(self.on_worker_error, errbacks)
  69. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  70. target, args, kwargs))
  71. return self._pool.apply_async(target, args, kwargs,
  72. callback=on_ready,
  73. accept_callback=accept_callback,
  74. timeout_callback=timeout_callback,
  75. error_callback=on_worker_error,
  76. waitforslot=self.putlocks)
  77. def grow(self, n=1):
  78. return self._pool.grow(n)
  79. def shrink(self, n=1):
  80. return self._pool.shrink(n)
  81. def on_worker_error(self, errbacks, exc):
  82. einfo = ExceptionInfo((exc.__class__, exc, None))
  83. [errback(einfo) for errback in errbacks]
  84. def on_ready(self, callbacks, errbacks, ret_value):
  85. """What to do when a worker task is ready and its return value has
  86. been collected."""
  87. if isinstance(ret_value, ExceptionInfo):
  88. if isinstance(ret_value.exception, (
  89. SystemExit, KeyboardInterrupt)):
  90. raise ret_value.exception
  91. [self.safe_apply_callback(errback, ret_value)
  92. for errback in errbacks]
  93. else:
  94. [self.safe_apply_callback(callback, ret_value)
  95. for callback in callbacks]
  96. def safe_apply_callback(self, fun, *args):
  97. try:
  98. fun(*args)
  99. except:
  100. self.logger.error("Pool callback raised exception: %s" % (
  101. traceback.format_exc(), ),
  102. exc_info=sys.exc_info())
  103. @property
  104. def info(self):
  105. return {"max-concurrency": self.limit,
  106. "processes": [p.pid for p in self._pool._pool],
  107. "max-tasks-per-child": self.maxtasksperchild,
  108. "put-guarded-by-semaphore": self.putlocks,
  109. "timeouts": (self.soft_timeout, self.timeout)}