pool.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. """
  2. Process Pools.
  3. """
  4. import os
  5. import time
  6. import errno
  7. import multiprocessing
  8. from multiprocessing.pool import Pool, worker
  9. from celery.datastructures import ExceptionInfo
  10. from celery.utils import noop
  11. from functools import partial as curry
  12. from operator import isNumberType
  13. def pid_is_dead(pid):
  14. """Check if a process is not running by PID.
  15. :rtype bool:
  16. """
  17. try:
  18. return os.kill(pid, 0)
  19. except OSError, err:
  20. if err.errno == errno.ESRCH:
  21. return True # No such process.
  22. elif err.errno == errno.EPERM:
  23. return False # Operation not permitted.
  24. else:
  25. raise
  26. def reap_process(pid):
  27. """Reap process if the process is a zombie.
  28. :returns: ``True`` if process was reaped or is not running,
  29. ``False`` otherwise.
  30. """
  31. if pid_is_dead(pid):
  32. return True
  33. try:
  34. is_dead, _ = os.waitpid(pid, os.WNOHANG)
  35. except OSError, err:
  36. if err.errno == errno.ECHILD:
  37. return False # No child processes.
  38. raise
  39. return is_dead
  40. def process_is_dead(process):
  41. """Check if process is not running anymore.
  42. First it finds out if the process is running by sending
  43. signal 0. Then if the process is a child process, and is running
  44. it finds out if it's a zombie process and reaps it.
  45. If the process is running and is not a zombie it tries to send
  46. a ping through the process pipe.
  47. :param process: A :class:`multiprocessing.Process` instance.
  48. :returns: ``True`` if the process is not running, ``False`` otherwise.
  49. """
  50. # Try to see if the process is actually running,
  51. # and reap zombie proceses while we're at it.
  52. # Only do this if os.kill exists for this platform (e.g. Windows doesn't
  53. # support it).
  54. if callable(getattr(os, "kill", None)) and reap_process(process.pid):
  55. return True
  56. # Then try to ping the process using its pipe.
  57. try:
  58. proc_is_alive = process.is_alive()
  59. except OSError:
  60. return True
  61. else:
  62. return not proc_is_alive
  63. class DynamicPool(Pool):
  64. """Version of :class:`multiprocessing.Pool` that can dynamically grow
  65. in size."""
  66. def __init__(self, processes=None, initializer=None, initargs=()):
  67. if processes is None:
  68. try:
  69. processes = cpu_count()
  70. except NotImplementedError:
  71. processes = 1
  72. super(DynamicPool, self).__init__(processes=processes,
  73. initializer=initializer,
  74. initargs=initargs)
  75. self._initializer = initializer
  76. self._initargs = initargs
  77. self._size = processes
  78. self.logger = multiprocessing.get_logger()
  79. def _my_cleanup(self):
  80. from multiprocessing.process import _current_process
  81. for p in list(_current_process._children):
  82. discard = False
  83. try:
  84. status = p._popen.poll()
  85. except OSError:
  86. discard = True
  87. else:
  88. if status is not None:
  89. discard = True
  90. if discard:
  91. _current_process._children.discard(p)
  92. def add_worker(self):
  93. """Add another worker to the pool."""
  94. self._my_cleanup()
  95. w = self.Process(target=worker,
  96. args=(self._inqueue, self._outqueue,
  97. self._initializer, self._initargs))
  98. w.name = w.name.replace("Process", "PoolWorker")
  99. w.daemon = True
  100. w.start()
  101. self._pool.append(w)
  102. self.logger.debug(
  103. "DynamicPool: Started pool worker %s (PID: %s, Poolsize: %d)" %(
  104. w.name, w.pid, len(self._pool)))
  105. def grow(self, size=1):
  106. """Add workers to the pool.
  107. :keyword size: Number of workers to add (default: 1)
  108. """
  109. [self.add_worker() for i in range(size)]
  110. def _is_dead(self, process):
  111. """Try to find out if the process is dead.
  112. :rtype bool:
  113. """
  114. if process_is_dead(process):
  115. self.logger.info("DynamicPool: Found dead process (PID: %s)" % (
  116. process.pid))
  117. return True
  118. return False
  119. def _bring_out_the_dead(self):
  120. """Sort out dead process from pool.
  121. :returns: Tuple of two lists, the first list with dead processes,
  122. the second with active processes.
  123. """
  124. dead, alive = [], []
  125. for process in self._pool:
  126. if process and process.pid and isNumberType(process.pid):
  127. dest = dead if self._is_dead(process) else alive
  128. dest.append(process)
  129. return dead, alive
  130. def replace_dead_workers(self):
  131. """Replace dead workers in the pool by spawning new ones.
  132. :returns: number of dead processes replaced, or ``None`` if all
  133. processes are alive and running.
  134. """
  135. dead, alive = self._bring_out_the_dead()
  136. if dead:
  137. dead_count = len(dead)
  138. self._pool = alive
  139. self.grow(self._size if dead_count > self._size else dead_count)
  140. return dead_count
  141. class TaskPool(object):
  142. """Process Pool for processing tasks in parallel.
  143. :param limit: see :attr:`limit` attribute.
  144. :param logger: see :attr:`logger` attribute.
  145. .. attribute:: limit
  146. The number of processes that can run simultaneously.
  147. .. attribute:: logger
  148. The logger used for debugging.
  149. """
  150. def __init__(self, limit, logger=None):
  151. self.limit = limit
  152. self.logger = logger or multiprocessing.get_logger()
  153. self._pool = None
  154. def start(self):
  155. """Run the task pool.
  156. Will pre-fork all workers so they're ready to accept tasks.
  157. """
  158. self._pool = DynamicPool(processes=self.limit)
  159. def stop(self):
  160. """Terminate the pool."""
  161. self._pool.terminate()
  162. self._pool = None
  163. def replace_dead_workers(self):
  164. self.logger.debug("TaskPool: Finding dead pool processes...")
  165. dead_count = self._pool.replace_dead_workers()
  166. if dead_count:
  167. self.logger.info(
  168. "TaskPool: Replaced %d dead pool workers..." % (
  169. dead_count))
  170. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  171. errbacks=None, on_ack=noop):
  172. """Equivalent of the :func:``apply`` built-in function.
  173. All ``callbacks`` and ``errbacks`` should complete immediately since
  174. otherwise the thread which handles the result will get blocked.
  175. """
  176. args = args or []
  177. kwargs = kwargs or {}
  178. callbacks = callbacks or []
  179. errbacks = errbacks or []
  180. on_ready = curry(self.on_ready, callbacks, errbacks, on_ack)
  181. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  182. target, args, kwargs))
  183. self.replace_dead_workers()
  184. return self._pool.apply_async(target, args, kwargs,
  185. callback=on_ready)
  186. def on_ready(self, callbacks, errbacks, on_ack, ret_value):
  187. """What to do when a worker task is ready and its return value has
  188. been collected."""
  189. # Acknowledge the task as being processed.
  190. on_ack()
  191. if isinstance(ret_value, ExceptionInfo):
  192. if isinstance(ret_value.exception, (
  193. SystemExit, KeyboardInterrupt)):
  194. raise ret_value.exception
  195. [errback(ret_value) for errback in errbacks]
  196. else:
  197. [callback(ret_value) for callback in callbacks]