__init__.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. """
  2. Process Pools.
  3. """
  4. import traceback
  5. from time import sleep, time
  6. from celery import log
  7. from celery.datastructures import ExceptionInfo
  8. from celery.utils.functional import partial
  9. from celery.concurrency.processes.pool import Pool, RUN
  10. def pingback(i):
  11. return i
  12. class TaskPool(object):
  13. """Process Pool for processing tasks in parallel.
  14. :param limit: see :attr:`limit`.
  15. :param logger: see :attr:`logger`.
  16. .. attribute:: limit
  17. The number of processes that can run simultaneously.
  18. .. attribute:: logger
  19. The logger used for debugging.
  20. """
  21. Pool = Pool
  22. def __init__(self, limit, logger=None, initializer=None,
  23. maxtasksperchild=None, timeout=None, soft_timeout=None,
  24. putlocks=True, initargs=None):
  25. self.limit = limit
  26. self.logger = logger or log.get_default_logger()
  27. self.initializer = initializer
  28. self.initargs = initargs
  29. self.maxtasksperchild = maxtasksperchild
  30. self.timeout = timeout
  31. self.soft_timeout = soft_timeout
  32. self.putlocks = putlocks
  33. self.initargs = None
  34. self._pool = None
  35. def start(self):
  36. """Run the task pool.
  37. Will pre-fork all workers so they're ready to accept tasks.
  38. """
  39. self._pool = self.Pool(processes=self.limit,
  40. initializer=self.initializer,
  41. initargs=self.initargs,
  42. timeout=self.timeout,
  43. soft_timeout=self.soft_timeout,
  44. maxtasksperchild=self.maxtasksperchild)
  45. def stop(self):
  46. """Gracefully stop the pool."""
  47. if self._pool is not None and self._pool._state == RUN:
  48. self._pool.close()
  49. self._pool.join()
  50. self._pool = None
  51. def terminate(self):
  52. """Force terminate the pool."""
  53. if self._pool is not None:
  54. self._pool.terminate()
  55. self._pool = None
  56. def diagnose(self, timeout=None):
  57. pids = set(worker.pid for worker in self._pool._pool)
  58. seen = set()
  59. results = {}
  60. time_start = time()
  61. def callback(i):
  62. for pid in results[i].worker_pids():
  63. seen.add(pid)
  64. i = 0
  65. while pids ^ seen:
  66. print("%r > %r" % (time() - time_start, timeout))
  67. if timeout and time() - time_start > timeout:
  68. print("TIMED OUT i==%r" % (i, ))
  69. break
  70. x = results[i] = self._pool.apply_async(pingback,
  71. args=(i, ),
  72. callback=callback)
  73. sleep(0.1)
  74. i += 1
  75. return {"active": list(seen),
  76. "waiting": list(pids ^ seen),
  77. "iterations": i}
  78. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  79. errbacks=None, accept_callback=None, timeout_callback=None,
  80. **compat):
  81. """Equivalent of the :func:``apply`` built-in function.
  82. All ``callbacks`` and ``errbacks`` should complete immediately since
  83. otherwise the thread which handles the result will get blocked.
  84. """
  85. args = args or []
  86. kwargs = kwargs or {}
  87. callbacks = callbacks or []
  88. errbacks = errbacks or []
  89. on_ready = partial(self.on_ready, callbacks, errbacks)
  90. on_worker_error = partial(self.on_worker_error, errbacks)
  91. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  92. target, args, kwargs))
  93. return self._pool.apply_async(target, args, kwargs,
  94. callback=on_ready,
  95. accept_callback=accept_callback,
  96. timeout_callback=timeout_callback,
  97. error_callback=on_worker_error,
  98. waitforslot=self.putlocks)
  99. def on_worker_error(self, errbacks, exc):
  100. einfo = ExceptionInfo((exc.__class__, exc, None))
  101. [errback(einfo) for errback in errbacks]
  102. def on_ready(self, callbacks, errbacks, ret_value):
  103. """What to do when a worker task is ready and its return value has
  104. been collected."""
  105. if isinstance(ret_value, ExceptionInfo):
  106. if isinstance(ret_value.exception, (
  107. SystemExit, KeyboardInterrupt)):
  108. raise ret_value.exception
  109. [self.safe_apply_callback(errback, ret_value)
  110. for errback in errbacks]
  111. else:
  112. [self.safe_apply_callback(callback, ret_value)
  113. for callback in callbacks]
  114. def safe_apply_callback(self, fun, *args):
  115. try:
  116. fun(*args)
  117. except:
  118. self.logger.error("Pool callback raised exception: %s" % (
  119. traceback.format_exc(), ))
  120. @property
  121. def info(self):
  122. return {"max-concurrency": self.limit,
  123. "processes": [p.pid for p in self._pool._pool],
  124. "max-tasks-per-child": self.maxtasksperchild,
  125. "put-guarded-by-semaphore": self.putlocks,
  126. "timeouts": (self.soft_timeout, self.timeout)}