pool.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. Process Pools.
  3. """
  4. import multiprocessing
  5. import itertools
  6. import threading
  7. import uuid
  8. from multiprocessing.pool import RUN as POOL_STATE_RUN
  9. from celery.datastructures import ExceptionInfo
  10. from functools import partial as curry
  11. class TaskPool(object):
  12. """Pool of running child processes, which starts waiting for the
  13. processes to finish when the queue limit has been reached.
  14. :param limit: see :attr:`limit` attribute.
  15. :param logger: see :attr:`logger` attribute.
  16. .. attribute:: limit
  17. The number of processes that can run simultaneously until
  18. we start collecting results.
  19. .. attribute:: logger
  20. The logger used for debugging.
  21. """
  22. def __init__(self, limit, logger=None):
  23. self.limit = limit
  24. self.logger = logger or multiprocessing.get_logger()
  25. self._process_counter = itertools.count(1)
  26. self._processed_total = 0
  27. self._pool = None
  28. self._processes = None
  29. def start(self):
  30. """Run the task pool.
  31. Will pre-fork all workers so they're ready to accept tasks.
  32. """
  33. self._processes = {}
  34. self._pool = multiprocessing.Pool(processes=self.limit)
  35. def stop(self):
  36. """Terminate the pool."""
  37. self._pool.terminate()
  38. self._processes = {}
  39. self._pool = None
  40. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  41. errbacks=None, on_acknowledge=None, meta=None):
  42. """Equivalent of the :func:``apply`` built-in function.
  43. All ``callbacks`` and ``errbacks`` should complete immediately since
  44. otherwise the thread which handles the result will get blocked.
  45. """
  46. args = args or []
  47. kwargs = kwargs or {}
  48. callbacks = callbacks or []
  49. errbacks = errbacks or []
  50. meta = meta or {}
  51. tid = str(uuid.uuid4())
  52. self._processed_total = self._process_counter.next()
  53. on_return = curry(self.on_return, tid, callbacks, errbacks, meta)
  54. if self.full():
  55. self.wait_for_result()
  56. result = self._pool.apply_async(target, args, kwargs,
  57. callback=on_return)
  58. if on_acknowledge:
  59. on_acknowledge()
  60. self._processes[tid] = [result, callbacks, errbacks, meta]
  61. return result
  62. def on_return(self, tid, callbacks, errbacks, meta, ret_value):
  63. """What to do when the process returns."""
  64. try:
  65. del(self._processes[tid])
  66. except KeyError:
  67. pass
  68. else:
  69. self.on_ready(callbacks, errbacks, meta, ret_value)
  70. def full(self):
  71. """Is the pool full?
  72. :returns: ``True`` if the maximum number of concurrent processes
  73. has been reached.
  74. """
  75. return len(self._processes.values()) >= self.limit
  76. def wait_for_result(self):
  77. """Waits for the first process in the pool to finish.
  78. This operation is blocking.
  79. """
  80. while True:
  81. if self.reap():
  82. break
  83. def reap(self):
  84. """Reap finished tasks."""
  85. self.logger.debug("Reaping processes...")
  86. processes_reaped = 0
  87. for process_no, entry in enumerate(self._processes.items()):
  88. tid, process_info = entry
  89. result, callbacks, errbacks, meta = process_info
  90. try:
  91. ret_value = result.get(timeout=0.3)
  92. except multiprocessing.TimeoutError:
  93. continue
  94. else:
  95. self.on_return(tid, callbacks, errbacks, meta, ret_value)
  96. processes_reaped += 1
  97. return processes_reaped
  98. def get_worker_pids(self):
  99. """Returns the process id's of all the pool workers."""
  100. return [process.pid for process in self._pool._pool]
  101. def on_ready(self, callbacks, errbacks, meta, ret_value):
  102. """What to do when a worker task is ready and its return value has
  103. been collected."""
  104. if isinstance(ret_value, ExceptionInfo):
  105. if isinstance(ret_value.exception, (
  106. SystemExit, KeyboardInterrupt)):
  107. raise ret_value.exception
  108. for errback in errbacks:
  109. errback(ret_value, meta)
  110. else:
  111. for callback in callbacks:
  112. callback(ret_value, meta)