__init__.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """
  2. Process Pools.
  3. """
  4. from celery.concurrency.base import BasePool
  5. from celery.concurrency.processes.pool import Pool, RUN
  6. class TaskPool(BasePool):
  7. """Process Pool for processing tasks in parallel.
  8. :param processes: see :attr:`processes`.
  9. :param logger: see :attr:`logger`.
  10. .. attribute:: limit
  11. The number of processes that can run simultaneously.
  12. .. attribute:: logger
  13. The logger used for debugging.
  14. """
  15. Pool = Pool
  16. def on_start(self):
  17. """Run the task pool.
  18. Will pre-fork all workers so they're ready to accept tasks.
  19. """
  20. self._pool = self.Pool(processes=self.limit, **self.options)
  21. self.on_apply = self._pool.apply_async
  22. def on_stop(self):
  23. """Gracefully stop the pool."""
  24. if self._pool is not None and self._pool._state == RUN:
  25. self._pool.close()
  26. self._pool.join()
  27. self._pool = None
  28. def on_terminate(self):
  29. """Force terminate the pool."""
  30. if self._pool is not None:
  31. self._pool.terminate()
  32. self._pool = None
  33. def grow(self, n=1):
  34. return self._pool.grow(n)
  35. def shrink(self, n=1):
  36. return self._pool.shrink(n)
  37. def _get_info(self):
  38. return {"max-concurrency": self.processes,
  39. "processes": [p.pid for p in self._pool._pool],
  40. "max-tasks-per-child": self._pool._maxtasksperchild,
  41. "put-guarded-by-semaphore": self.putlocks,
  42. "timeouts": (self._pool.soft_timeout, self._pool.timeout)}