pool.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """
  2. Process Pools.
  3. """
  4. import multiprocessing
  5. from multiprocessing.pool import Pool
  6. from celery.datastructures import ExceptionInfo
  7. from celery.utils import gen_unique_id
  8. from functools import partial as curry
  9. class TaskPool(object):
  10. """Process Pool for processing tasks in parallel.
  11. :param limit: see :attr:`limit` attribute.
  12. :param logger: see :attr:`logger` attribute.
  13. .. attribute:: limit
  14. The number of processes that can run simultaneously.
  15. .. attribute:: logger
  16. The logger used for debugging.
  17. """
  18. def __init__(self, limit, logger=None):
  19. self.limit = limit
  20. self.logger = logger or multiprocessing.get_logger()
  21. self._pool = None
  22. self._processes = None
  23. def start(self):
  24. """Run the task pool.
  25. Will pre-fork all workers so they're ready to accept tasks.
  26. """
  27. self._processes = {}
  28. self._pool = Pool(processes=self.limit)
  29. def stop(self):
  30. """Terminate the pool."""
  31. self._pool.terminate()
  32. self._processes = {}
  33. self._pool = None
  34. def apply_async(self, target, args=None, kwargs=None, callbacks=None,
  35. errbacks=None, meta=None):
  36. """Equivalent of the :func:``apply`` built-in function.
  37. All ``callbacks`` and ``errbacks`` should complete immediately since
  38. otherwise the thread which handles the result will get blocked.
  39. """
  40. args = args or []
  41. kwargs = kwargs or {}
  42. callbacks = callbacks or []
  43. errbacks = errbacks or []
  44. meta = meta or {}
  45. tid = gen_unique_id()
  46. on_return = curry(self.on_return, tid, callbacks, errbacks, meta)
  47. result = self._pool.apply_async(target, args, kwargs,
  48. callback=on_return)
  49. self._processes[tid] = [result, callbacks, errbacks, meta]
  50. return result
  51. def on_return(self, tid, callbacks, errbacks, meta, ret_value):
  52. """What to do when the process returns."""
  53. try:
  54. del(self._processes[tid])
  55. except KeyError:
  56. pass
  57. else:
  58. self.on_ready(callbacks, errbacks, meta, ret_value)
  59. def full(self):
  60. """Is the pool full?
  61. :returns: ``True`` if the maximum number of concurrent processes
  62. has been reached.
  63. """
  64. return len(self._processes.values()) >= self.limit
  65. def get_worker_pids(self):
  66. """Returns the process id's of all the pool workers."""
  67. return [process.pid for process in self._pool._pool]
  68. def on_ready(self, callbacks, errbacks, meta, ret_value):
  69. """What to do when a worker task is ready and its return value has
  70. been collected."""
  71. if isinstance(ret_value, ExceptionInfo):
  72. if isinstance(ret_value.exception, (
  73. SystemExit, KeyboardInterrupt)):
  74. raise ret_value.exception
  75. for errback in errbacks:
  76. errback(ret_value, meta)
  77. else:
  78. for callback in callbacks:
  79. callback(ret_value, meta)