evg.py 782 B

1234567891011121314151617181920212223242526272829303132
  1. import os
  2. from gevent import monkey
  3. if not os.environ.get("GEVENT_NOPATCH"):
  4. monkey.patch_all()
  5. from gevent import Greenlet
  6. from gevent.pool import Pool
  7. from celery.concurrency.base import apply_target, BasePool
  8. class TaskPool(BasePool):
  9. Pool = Pool
  10. signal_safe = False
  11. def on_start(self):
  12. self._pool = self.Pool(self.limit)
  13. def on_stop(self):
  14. if self._pool is not None:
  15. self._pool.join()
  16. def on_apply(self, target, args=None, kwargs=None, callback=None,
  17. accept_callback=None, **_):
  18. return self._pool.spawn(apply_target, target, args, kwargs,
  19. callback, accept_callback)
  20. def blocking(self, fun, *args, **kwargs):
  21. Greenlet.spawn(fun, *args, **kwargs).get()