evg.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import os
  2. import sys
  3. from time import time
  4. if not os.environ.get("GEVENT_NOPATCH"):
  5. from gevent import monkey
  6. monkey.patch_all()
  7. from celery.concurrency.base import apply_target, BasePool
  8. from celery.utils import timer2
  9. class Schedule(timer2.Schedule):
  10. def __init__(self, *args, **kwargs):
  11. from gevent.greenlet import Greenlet, GreenletExit
  12. class _Greenlet(Greenlet):
  13. def cancel(self):
  14. self.kill()
  15. self._Greenlet = _Greenlet
  16. self._GreenletExit = GreenletExit
  17. super(Schedule, self).__init__(*args, **kwargs)
  18. self._queue = set()
  19. def enter(self, entry, eta=None, priority=0):
  20. try:
  21. eta = timer2.to_timestamp(eta)
  22. except OverflowError:
  23. if not self.handle_error(sys.exc_info()):
  24. raise
  25. now = time()
  26. if eta is None:
  27. eta = now
  28. secs = max(eta - now, 0)
  29. g = self._Greenlet.spawn_later(secs, entry)
  30. self._queue.add(g)
  31. g.link(self._entry_exit)
  32. g.entry = entry
  33. g.eta = eta
  34. g.priority = priority
  35. g.cancelled = False
  36. return g
  37. def _entry_exit(self, g):
  38. try:
  39. g.kill()
  40. finally:
  41. self._queue.discard(g)
  42. def clear(self):
  43. queue = self._queue
  44. while queue:
  45. try:
  46. queue.pop().kill()
  47. except KeyError:
  48. pass
  49. @property
  50. def queue(self):
  51. return [(g.eta, g.priority, g.entry) for g in self._queue]
  52. class Timer(timer2.Timer):
  53. Schedule = Schedule
  54. def ensure_started(self):
  55. pass
  56. def stop(self):
  57. self.schedule.clear()
  58. def start(self):
  59. pass
  60. class TaskPool(BasePool):
  61. Timer = Timer
  62. signal_safe = False
  63. is_green = True
  64. def __init__(self, *args, **kwargs):
  65. from gevent import spawn_raw
  66. from gevent.pool import Pool
  67. self.Pool = Pool
  68. self.spawn_n = spawn_raw
  69. super(TaskPool, self).__init__(*args, **kwargs)
  70. def on_start(self):
  71. self._pool = self.Pool(self.limit)
  72. def on_stop(self):
  73. if self._pool is not None:
  74. self._pool.join()
  75. def on_apply(self, target, args=None, kwargs=None, callback=None,
  76. accept_callback=None, **_):
  77. return self._pool.spawn(apply_target, target, args, kwargs,
  78. callback, accept_callback)