__init__.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. """
  2. Process Pools.
  3. """
  4. import os
  5. import signal as _signal
  6. from celery.concurrency.base import BasePool
  7. from celery.concurrency.processes.pool import Pool, RUN
  8. class TaskPool(BasePool):
  9. """Process Pool for processing tasks in parallel.
  10. :param processes: see :attr:`processes`.
  11. :param logger: see :attr:`logger`.
  12. .. attribute:: limit
  13. The number of processes that can run simultaneously.
  14. .. attribute:: logger
  15. The logger used for debugging.
  16. """
  17. Pool = Pool
  18. def on_start(self):
  19. """Run the task pool.
  20. Will pre-fork all workers so they're ready to accept tasks.
  21. """
  22. self._pool = self.Pool(processes=self.limit, **self.options)
  23. self.on_apply = self._pool.apply_async
  24. def on_stop(self):
  25. """Gracefully stop the pool."""
  26. if self._pool is not None and self._pool._state == RUN:
  27. self._pool.close()
  28. self._pool.join()
  29. self._pool = None
  30. def on_terminate(self):
  31. """Force terminate the pool."""
  32. if self._pool is not None:
  33. self._pool.terminate()
  34. self._pool = None
  35. def terminate_job(self, pid, signal=None):
  36. os.kill(pid, signal or _signal.SIGTERM)
  37. def grow(self, n=1):
  38. return self._pool.grow(n)
  39. def shrink(self, n=1):
  40. return self._pool.shrink(n)
  41. def _get_info(self):
  42. return {"max-concurrency": self.limit,
  43. "processes": [p.pid for p in self._pool._pool],
  44. "max-tasks-per-child": self._pool._maxtasksperchild,
  45. "put-guarded-by-semaphore": self.putlocks,
  46. "timeouts": (self._pool.soft_timeout, self._pool.timeout)}