pool.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. """
  2. Process Pools.
  3. """
  4. import os
  5. import time
  6. import errno
  7. import multiprocessing
  8. from multiprocessing.pool import Pool, worker
  9. from celery.datastructures import ExceptionInfo
  10. from celery.utils import gen_unique_id
  11. from functools import partial as curry
  12. def pid_is_dead(pid):
  13. """Check if a process is not running by PID.
  14. :rtype bool:
  15. """
  16. try:
  17. return os.kill(pid, 0)
  18. except OSError, err:
  19. if err.errno == errno.ESRCH:
  20. return True # No such process.
  21. elif err.errno == errno.EPERM:
  22. return False # Operation not permitted.
  23. else:
  24. raise
  25. def reap_process(pid):
  26. """Reap process if the process is a zombie.
  27. :returns: ``True`` if process was reaped or is not running,
  28. ``False`` otherwise.
  29. """
  30. if pid_is_dead(pid):
  31. return True
  32. try:
  33. is_dead, _ = os.waitpid(pid, os.WNOHANG)
  34. except OSError, err:
  35. if err.errno == errno.ECHILD:
  36. return False # No child processes.
  37. raise
  38. return is_dead
  39. def process_is_dead(process):
  40. """Check if process is not running anymore.
  41. First it finds out if the process is running by sending
  42. signal 0. Then if the process is a child process, and is running
  43. it finds out if it's a zombie process and reaps it.
  44. If the process is running and is not a zombie it tries to send
  45. a ping through the process pipe.
  46. :param process: A :class:`multiprocessing.Process` instance.
  47. :returns: ``True`` if the process is not running, ``False`` otherwise.
  48. """
  49. # Make sure PID is an integer (no idea why this happens).
  50. try:
  51. int(process.pid)
  52. except (TypeError, ValueError):
  53. return True
  54. # Try to see if the process is actually running,
  55. # and reap zombie proceses while we're at it.
  56. if reap_process(process.pid):
  57. return True
  58. # Then try to ping the process using its pipe.
  59. try:
  60. proc_is_alive = process.is_alive()
  61. except OSError:
  62. return True
  63. else:
  64. return not proc_is_alive
  65. class DynamicPool(Pool):
  66. """Version of :class:`multiprocessing.Pool` that can dynamically grow
  67. in size."""
  68. def __init__(self, processes=None, initializer=None, initargs=()):
  69. if processes is None:
  70. try:
  71. processes = cpu_count()
  72. except NotImplementedError:
  73. processes = 1
  74. super(DynamicPool, self).__init__(processes=processes,
  75. initializer=initializer,
  76. initargs=initargs)
  77. self._initializer = initializer
  78. self._initargs = initargs
  79. self._size = processes
  80. self.logger = multiprocessing.get_logger()
  81. def add_worker(self):
  82. """Add another worker to the pool."""
  83. w = self.Process(target=worker,
  84. args=(self._inqueue, self._outqueue,
  85. self._initializer, self._initargs))
  86. self._pool.append(w)
  87. w.name = w.name.replace("Process", "PoolWorker")
  88. w.daemon = True
  89. w.start()
  90. def grow(self, size=1):
  91. """Add workers to the pool.
  92. :keyword size: Number of workers to add (default: 1)
  93. """
  94. [self.add_worker() for i in range(size)]
  95. def _is_dead(self, process):
  96. """Try to find out if the process is dead.
  97. :rtype bool:
  98. """
  99. if process_is_dead(process):
  100. self.logger.info("DynamicPool: Found dead process (PID: %s)" % (
  101. process.pid))
  102. return True
  103. return False
  104. def _bring_out_the_dead(self):
  105. """Sort out dead process from pool.
  106. :returns: Tuple of two lists, the first list with dead processes,
  107. the second with active processes.
  108. """
  109. dead, alive = [], []
  110. for process in self._pool:
  111. if process and process.pid:
  112. dest = dead if self._is_dead(process) else alive
  113. dest.append(process)
  114. return dead, alive
  115. def replace_dead_workers(self):
  116. """Replace dead workers in the pool by spawning new ones."""
  117. dead, self._pool = self._bring_out_the_dead()
  118. self.grow(self._size if len(dead) > self._size else len(dead))
  119. class TaskPool(object):
  120. """Process Pool for processing tasks in parallel.
  121. :param limit: see :attr:`limit` attribute.
  122. :param logger: see :attr:`logger` attribute.
  123. .. attribute:: limit
  124. The number of processes that can run simultaneously.
  125. .. attribute:: logger
  126. The logger used for debugging.
  127. """
  128. def __init__(self, limit, logger=None):
  129. self.limit = limit
  130. self.logger = logger or multiprocessing.get_logger()
  131. self._pool = None
  132. def start(self):
  133. """Run the task pool.
  134. Will pre-fork all workers so they're ready to accept tasks.
  135. """
  136. self._pool = DynamicPool(processes=self.limit)
  137. def stop(self):
  138. """Terminate the pool."""
  139. self._pool.terminate()
  140. self._pool = None
  141. def replace_dead_workers(self):
  142. self.logger.debug("TaskPool: Finding dead pool processes...")
  143. dead_count = self._pool.replace_dead_workers()
  144. if dead_count:
  145. self.logger.info(
  146. "TaskPool: Replaced %d dead pool workers..." % (
  147. dead_count))
  148. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  149. errbacks=None, on_ack=None, meta=None):
  150. """Equivalent of the :func:``apply`` built-in function.
  151. All ``callbacks`` and ``errbacks`` should complete immediately since
  152. otherwise the thread which handles the result will get blocked.
  153. """
  154. args = args or []
  155. kwargs = kwargs or {}
  156. callbacks = callbacks or []
  157. errbacks = errbacks or []
  158. meta = meta or {}
  159. on_return = curry(self.on_return, callbacks, errbacks,
  160. on_ack, meta)
  161. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  162. target, args, kwargs))
  163. self.replace_dead_workers()
  164. return self._pool.apply_async(target, args, kwargs,
  165. callback=on_return)
  166. def on_return(self, callbacks, errbacks, on_ack, meta, ret_value):
  167. """What to do when the process returns."""
  168. # Acknowledge the task as being processed.
  169. if on_ack:
  170. on_ack()
  171. self.on_ready(callbacks, errbacks, meta, ret_value)
  172. def on_ready(self, callbacks, errbacks, meta, ret_value):
  173. """What to do when a worker task is ready and its return value has
  174. been collected."""
  175. if isinstance(ret_value, ExceptionInfo):
  176. if isinstance(ret_value.exception, (
  177. SystemExit, KeyboardInterrupt)):
  178. raise ret_value.exception
  179. for errback in errbacks:
  180. errback(ret_value, meta)
  181. else:
  182. for callback in callbacks:
  183. callback(ret_value, meta)