pool.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. """
  2. Process Pools.
  3. """
  4. import multiprocessing
  5. import itertools
  6. import threading
  7. import uuid
  8. from multiprocessing.pool import RUN as POOL_STATE_RUN
  9. from celery.datastructures import ExceptionInfo
  10. class TaskPool(object):
  11. """Pool of running child processes, which starts waiting for the
  12. processes to finish when the queue limit has been reached.
  13. :param limit: see :attr:`limit` attribute.
  14. :param logger: see :attr:`logger` attribute.
  15. .. attribute:: limit
  16. The number of processes that can run simultaneously until
  17. we start collecting results.
  18. .. attribute:: logger
  19. The logger used for debugging.
  20. """
  21. def __init__(self, limit, reap_timeout=None, logger=None):
  22. self.limit = limit
  23. self.logger = logger or multiprocessing.get_logger()
  24. self.reap_timeout = reap_timeout
  25. self._process_counter = itertools.count(1)
  26. self._processed_total = 0
  27. self._pool = None
  28. self._processes = None
  29. def run(self):
  30. """Run the task pool.
  31. Will launch all worker processes so they are ready
  32. for processing tasks.
  33. """
  34. self._start()
  35. def _start(self):
  36. """INTERNAL: Starts the pool. Used by :meth:`run`."""
  37. self._processes = {}
  38. self._pool = multiprocessing.Pool(processes=self.limit)
  39. def _terminate_and_restart(self):
  40. """INTERNAL: Terminate and restart the pool."""
  41. try:
  42. self._pool.terminate()
  43. except OSError:
  44. pass
  45. self._start()
  46. def _restart(self):
  47. """INTERNAL: Close and restart the pool."""
  48. self.logger.info("Closing and restarting the pool...")
  49. self._pool.close()
  50. timeout_thread = threading.Timer(30.0, self._terminate_and_restart)
  51. timeout_thread.start()
  52. self._pool.join()
  53. timeout_thread.cancel()
  54. self._start()
  55. def _pool_is_running(self):
  56. """Check if the pool is in the run state.
  57. :returns: ``True`` if the pool is running.
  58. """
  59. return self._pool._state == POOL_STATE_RUN
  60. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  61. errbacks=None, meta=None):
  62. """Equivalent of the :func:``apply`` built-in function.
  63. All ``callbacks`` and ``errbacks`` should complete immediately since
  64. otherwise the thread which handles the result will get blocked.
  65. """
  66. args = args or []
  67. kwargs = kwargs or {}
  68. callbacks = callbacks or []
  69. errbacks = errbacks or []
  70. meta = meta or {}
  71. tid = str(uuid.uuid4())
  72. if not self._pool_is_running():
  73. self._start()
  74. self._processed_total = self._process_counter.next()
  75. on_return = lambda r: self.on_return(r, tid, callbacks, errbacks, meta)
  76. result = self._pool.apply_async(target, args, kwargs,
  77. callback=on_return)
  78. self.add(result, callbacks, errbacks, tid, meta)
  79. return result
  80. def on_return(self, ret_val, tid, callbacks, errbacks, meta):
  81. """What to do when the process returns."""
  82. try:
  83. del(self._processes[tid])
  84. except KeyError:
  85. pass
  86. else:
  87. self.on_ready(ret_val, callbacks, errbacks, meta)
  88. def add(self, result, callbacks, errbacks, tid, meta):
  89. """Add a process to the queue.
  90. If the queue is full, it will wait for the first task to finish,
  91. collects its result and remove it from the queue, so it's ready
  92. to accept new processes.
  93. :param result: A :class:`multiprocessing.AsyncResult` instance, as
  94. returned by :meth:`multiprocessing.Pool.apply_async`.
  95. :option callbacks: List of callbacks to execute if the task was
  96. successful. Must have the function signature:
  97. ``mycallback(result, meta)``
  98. :option errbacks: List of errbacks to execute if the task raised
  99. and exception. Must have the function signature:
  100. ``myerrback(exc, meta)``.
  101. :option tid: The tid for this task (unqiue pool id).
  102. """
  103. self._processes[tid] = [result, callbacks, errbacks, meta]
  104. if self.full():
  105. self.wait_for_result()
  106. def full(self):
  107. """Is the pool full?
  108. :returns: ``True`` if the maximum number of concurrent processes
  109. has been reached.
  110. """
  111. return len(self._processes.values()) >= self.limit
  112. def wait_for_result(self):
  113. """Waits for the first process in the pool to finish.
  114. This operation is blocking.
  115. """
  116. while True:
  117. if self.reap():
  118. break
  119. def reap(self):
  120. """Reap finished tasks."""
  121. self.logger.debug("Reaping processes...")
  122. processes_reaped = 0
  123. for process_no, entry in enumerate(self._processes.items()):
  124. tid, process_info = entry
  125. result, callbacks, errbacks, meta = process_info
  126. try:
  127. ret_value = result.get(timeout=0.3)
  128. except multiprocessing.TimeoutError:
  129. continue
  130. else:
  131. self.on_return(ret_value, tid, callbacks, errbacks, meta)
  132. processes_reaped += 1
  133. return processes_reaped
  134. def get_worker_pids(self):
  135. """Returns the process id's of all the pool workers.
  136. :rtype: list
  137. """
  138. return [process.pid for process in self._pool._pool]
  139. def on_ready(self, ret_value, callbacks, errbacks, meta):
  140. """What to do when a worker task is ready and its return value has
  141. been collected."""
  142. if isinstance(ret_value, ExceptionInfo):
  143. for errback in errbacks:
  144. errback(ret_value, meta)
  145. else:
  146. for callback in callbacks:
  147. callback(ret_value, meta)