pool.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. """
  2. Process Pools.
  3. """
  4. import multiprocessing
  5. from multiprocessing.pool import Pool, worker
  6. from celery.datastructures import ExceptionInfo
  7. from celery.utils import gen_unique_id
  8. from functools import partial as curry
  9. MAX_RESTART_FREQ = 10
  10. MAX_RESTART_FREQ_TIME = 60
  11. class DynamicPool(Pool):
  12. """Version of :class:`multiprocessing.Pool` that can dynamically grow
  13. in size."""
  14. def __init__(self, processes=None, initializer=None, initargs=()):
  15. super(DynamicPool, self).__init__(processes=processes,
  16. initializer=initializer,
  17. initargs=initargs)
  18. self._initializer = initializer
  19. self._initargs = initargs
  20. def add_worker(self):
  21. """Add another worker to the pool."""
  22. w = self.Process(target=worker,
  23. args=(self._inqueue, self._outqueue,
  24. self._initializer, self._initargs))
  25. self._pool.append(w)
  26. w.name = w.name.replace("Process", "PoolWorker")
  27. w.daemon = True
  28. w.start()
  29. def grow(self, size=1):
  30. """Add ``size`` new workers to the pool."""
  31. map(self._add_worker, range(size))
  32. def get_worker_pids(self):
  33. """Returns the process id's of all the pool workers."""
  34. return [process.pid for process in self.processes]
  35. def replace_dead_workers(self):
  36. dead = [process for process in self.processes
  37. if not process.is_alive()]
  38. if dead:
  39. dead_pids = [process.pid for process in dead]
  40. self._pool = [process for process in self._pool
  41. if process.pid not in dead_pids]
  42. self.grow(len(dead))
  43. return dead
  44. @property
  45. def processes(self):
  46. return self._pool
  47. class PoolSupervisor(object):
  48. """Supervisor implementing the "one_for_one" strategy.
  49. :param target: See :attr:`target`.
  50. :param max_restart_freq: See :attr:`max_restart_freq`.
  51. :param max_restart_freq_time: See :attr:`max_restart_freq_time`.
  52. .. attribute:: target
  53. The target pool to supervise.
  54. .. attribute:: max_restart_freq
  55. Limit the number of restarts which can occur in a given time interval.
  56. The max restart frequency is the number of restarts that can occur
  57. within the interval :attr:`max_restart_freq_time`.
  58. The restart mechanism prevents situations where the process repeatedly
  59. dies for the same reason. If this happens both the process and the
  60. supervisor is terminated.
  61. .. attribute:: max_restart_freq_time
  62. See :attr:`max_restart_freq`.
  63. """
  64. def __init__(self, target, max_restart_freq=MAX_RESTART_FREQ,
  65. max_restart_freq_time=MAX_RESTART_FREQ_TIME):
  66. self.target = target
  67. self.max_restart_freq = max_restart_freq * len(target.processes)
  68. self.max_restart_freq_time = max_restart_freq_time
  69. self.restart_frame_time = None
  70. self.restarts_in_frame = 0
  71. def restart_freq_exceeded(self):
  72. if not self.restart_frame_time:
  73. self.restart_frame_time = time.time()
  74. return False
  75. time_exceeded = time.time() > self.max_restart_frame_time + \
  76. self.max_restart_freq_time
  77. if time_exceeded and self.restarts_in_frame >= self.max_restart_freq:
  78. return True
  79. def supervise(self):
  80. dead = self.target.replace_dead_workers()
  81. if dead:
  82. self.restarts_in_frame += len(dead)
  83. if self.restart_freq_exceeded():
  84. raise MaxRestartsExceededError(
  85. "Pool supervisor: Max restart frequencey exceeded.")
  86. class TaskPool(object):
  87. """Process Pool for processing tasks in parallel.
  88. :param limit: see :attr:`limit` attribute.
  89. :param logger: see :attr:`logger` attribute.
  90. .. attribute:: limit
  91. The number of processes that can run simultaneously.
  92. .. attribute:: logger
  93. The logger used for debugging.
  94. """
  95. def __init__(self, limit, logger=None):
  96. self.limit = limit
  97. self.logger = logger or multiprocessing.get_logger()
  98. self._pool = None
  99. self._supervisor = None
  100. def start(self):
  101. """Run the task pool.
  102. Will pre-fork all workers so they're ready to accept tasks.
  103. """
  104. self._pool = DynamicPool(processes=self.limit)
  105. self._supervisor = PoolSupervisor(self._pool)
  106. def stop(self):
  107. """Terminate the pool."""
  108. self._pool.terminate()
  109. self._pool = None
  110. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  111. errbacks=None, on_ack=None, meta=None):
  112. """Equivalent of the :func:``apply`` built-in function.
  113. All ``callbacks`` and ``errbacks`` should complete immediately since
  114. otherwise the thread which handles the result will get blocked.
  115. """
  116. args = args or []
  117. kwargs = kwargs or {}
  118. callbacks = callbacks or []
  119. errbacks = errbacks or []
  120. meta = meta or {}
  121. on_return = curry(self.on_return, callbacks, errbacks,
  122. on_ack, meta)
  123. self._supervisor.supervise()
  124. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  125. target, args, kwargs))
  126. return self._pool.apply_async(target, args, kwargs,
  127. callback=on_return)
  128. def on_return(self, callbacks, errbacks, on_ack, meta, ret_value):
  129. """What to do when the process returns."""
  130. # Acknowledge the task as being processed.
  131. if on_ack:
  132. on_ack()
  133. self.on_ready(callbacks, errbacks, meta, ret_value)
  134. def get_worker_pids(self):
  135. """Returns the process id's of all the pool workers."""
  136. return [process.pid for process in self._pool._pool]
  137. def on_ready(self, callbacks, errbacks, meta, ret_value):
  138. """What to do when a worker task is ready and its return value has
  139. been collected."""
  140. if isinstance(ret_value, ExceptionInfo):
  141. if isinstance(ret_value.exception, (
  142. SystemExit, KeyboardInterrupt)):
  143. raise ret_value.exception
  144. for errback in errbacks:
  145. errback(ret_value, meta)
  146. else:
  147. for callback in callbacks:
  148. callback(ret_value, meta)