pool.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. """
  2. Process Pools.
  3. """
  4. import os
  5. import errno
  6. import multiprocessing
  7. from multiprocessing.pool import Pool, worker
  8. from celery.datastructures import ExceptionInfo
  9. from celery.utils import noop
  10. from celery.utils.functional import curry
  11. from operator import isNumberType
  12. def pid_is_dead(pid):
  13. """Check if a process is not running by PID.
  14. :rtype bool:
  15. """
  16. try:
  17. return os.kill(pid, 0)
  18. except OSError, err:
  19. if err.errno == errno.ESRCH:
  20. return True # No such process.
  21. elif err.errno == errno.EPERM:
  22. return False # Operation not permitted.
  23. else:
  24. raise
  25. def reap_process(pid):
  26. """Reap process if the process is a zombie.
  27. :returns: ``True`` if process was reaped or is not running,
  28. ``False`` otherwise.
  29. """
  30. if pid_is_dead(pid):
  31. return True
  32. try:
  33. is_dead, _ = os.waitpid(pid, os.WNOHANG)
  34. except OSError, err:
  35. if err.errno == errno.ECHILD:
  36. return False # No child processes.
  37. raise
  38. return is_dead
  39. def process_is_dead(process):
  40. """Check if process is not running anymore.
  41. First it finds out if the process is running by sending
  42. signal 0. Then if the process is a child process, and is running
  43. it finds out if it's a zombie process and reaps it.
  44. If the process is running and is not a zombie it tries to send
  45. a ping through the process pipe.
  46. :param process: A :class:`multiprocessing.Process` instance.
  47. :returns: ``True`` if the process is not running, ``False`` otherwise.
  48. """
  49. # Only do this if os.kill exists for this platform (e.g. Windows doesn't
  50. # support it).
  51. if callable(getattr(os, "kill", None)) and reap_process(process.pid):
  52. return True
  53. # Then try to ping the process using its pipe.
  54. try:
  55. proc_is_alive = process.is_alive()
  56. except OSError:
  57. return True
  58. else:
  59. return not proc_is_alive
  60. class DynamicPool(Pool):
  61. """Version of :class:`multiprocessing.Pool` that can dynamically grow
  62. in size."""
  63. def __init__(self, processes=None, initializer=None, initargs=()):
  64. if processes is None:
  65. try:
  66. processes = multiprocessing.cpu_count()
  67. except NotImplementedError:
  68. processes = 1
  69. super(DynamicPool, self).__init__(processes=processes,
  70. initializer=initializer,
  71. initargs=initargs)
  72. self._initializer = initializer
  73. self._initargs = initargs
  74. self._size = processes
  75. self.logger = multiprocessing.get_logger()
  76. def _my_cleanup(self):
  77. from multiprocessing.process import _current_process
  78. for p in list(_current_process._children):
  79. discard = False
  80. try:
  81. status = p._popen.poll()
  82. except OSError:
  83. discard = True
  84. else:
  85. if status is not None:
  86. discard = True
  87. if discard:
  88. _current_process._children.discard(p)
  89. def add_worker(self):
  90. """Add another worker to the pool."""
  91. self._my_cleanup()
  92. w = self.Process(target=worker,
  93. args=(self._inqueue, self._outqueue,
  94. self._initializer, self._initargs))
  95. w.name = w.name.replace("Process", "PoolWorker")
  96. w.daemon = True
  97. w.start()
  98. self._pool.append(w)
  99. self.logger.debug(
  100. "DynamicPool: Started pool worker %s (PID: %s, Poolsize: %d)" %(
  101. w.name, w.pid, len(self._pool)))
  102. def grow(self, size=1):
  103. """Add workers to the pool.
  104. :keyword size: Number of workers to add (default: 1)
  105. """
  106. [self.add_worker() for i in range(size)]
  107. def _is_dead(self, process):
  108. """Try to find out if the process is dead.
  109. :rtype bool:
  110. """
  111. if process_is_dead(process):
  112. self.logger.info("DynamicPool: Found dead process (PID: %s)" % (
  113. process.pid))
  114. return True
  115. return False
  116. def _bring_out_the_dead(self):
  117. """Sort out dead process from pool.
  118. :returns: Tuple of two lists, the first list with dead processes,
  119. the second with active processes.
  120. """
  121. dead, alive = [], []
  122. for process in self._pool:
  123. if process and process.pid and isNumberType(process.pid):
  124. dest = dead if self._is_dead(process) else alive
  125. dest.append(process)
  126. return dead, alive
  127. def replace_dead_workers(self):
  128. """Replace dead workers in the pool by spawning new ones.
  129. :returns: number of dead processes replaced, or ``None`` if all
  130. processes are alive and running.
  131. """
  132. dead, alive = self._bring_out_the_dead()
  133. if dead:
  134. dead_count = len(dead)
  135. self._pool = alive
  136. self.grow(self._size if dead_count > self._size else dead_count)
  137. return dead_count
  138. class TaskPool(object):
  139. """Process Pool for processing tasks in parallel.
  140. :param limit: see :attr:`limit` attribute.
  141. :param logger: see :attr:`logger` attribute.
  142. .. attribute:: limit
  143. The number of processes that can run simultaneously.
  144. .. attribute:: logger
  145. The logger used for debugging.
  146. """
  147. def __init__(self, limit, logger=None):
  148. self.limit = limit
  149. self.logger = logger or multiprocessing.get_logger()
  150. self._pool = None
  151. def start(self):
  152. """Run the task pool.
  153. Will pre-fork all workers so they're ready to accept tasks.
  154. """
  155. self._pool = DynamicPool(processes=self.limit)
  156. def stop(self):
  157. """Terminate the pool."""
  158. self._pool.terminate()
  159. self._pool = None
  160. def replace_dead_workers(self):
  161. self.logger.debug("TaskPool: Finding dead pool processes...")
  162. dead_count = self._pool.replace_dead_workers()
  163. if dead_count:
  164. self.logger.info(
  165. "TaskPool: Replaced %d dead pool workers..." % (
  166. dead_count))
  167. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  168. errbacks=None, on_ack=noop):
  169. """Equivalent of the :func:``apply`` built-in function.
  170. All ``callbacks`` and ``errbacks`` should complete immediately since
  171. otherwise the thread which handles the result will get blocked.
  172. """
  173. args = args or []
  174. kwargs = kwargs or {}
  175. callbacks = callbacks or []
  176. errbacks = errbacks or []
  177. on_ready = curry(self.on_ready, callbacks, errbacks, on_ack)
  178. self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
  179. target, args, kwargs))
  180. self.replace_dead_workers()
  181. return self._pool.apply_async(target, args, kwargs,
  182. callback=on_ready)
  183. def on_ready(self, callbacks, errbacks, on_ack, ret_value):
  184. """What to do when a worker task is ready and its return value has
  185. been collected."""
  186. # Acknowledge the task as being processed.
  187. on_ack()
  188. if isinstance(ret_value, ExceptionInfo):
  189. if isinstance(ret_value.exception, (
  190. SystemExit, KeyboardInterrupt)):
  191. raise ret_value.exception
  192. [errback(ret_value) for errback in errbacks]
  193. else:
  194. [callback(ret_value) for callback in callbacks]