eventlet.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import os
  4. if not os.environ.get("EVENTLET_NOPATCH"):
  5. import eventlet
  6. import eventlet.debug
  7. eventlet.monkey_patch()
  8. eventlet.debug.hub_prevent_multiple_readers(False)
  9. from time import time
  10. from celery import signals
  11. from celery.utils import timer2
  12. from . import base
  13. def apply_target(target, args=(), kwargs={}, callback=None,
  14. accept_callback=None, getpid=None):
  15. return base.apply_target(target, args, kwargs, callback, accept_callback,
  16. pid=getpid())
  17. class Schedule(timer2.Schedule):
  18. def __init__(self, *args, **kwargs):
  19. from eventlet.greenthread import spawn_after
  20. from greenlet import GreenletExit
  21. super(Schedule, self).__init__(*args, **kwargs)
  22. self.GreenletExit = GreenletExit
  23. self._spawn_after = spawn_after
  24. self._queue = set()
  25. def _enter(self, eta, priority, entry):
  26. secs = max(eta - time(), 0)
  27. g = self._spawn_after(secs, entry)
  28. self._queue.add(g)
  29. g.link(self._entry_exit, entry)
  30. g.entry = entry
  31. g.eta = eta
  32. g.priority = priority
  33. g.cancelled = False
  34. return g
  35. def _entry_exit(self, g, entry):
  36. try:
  37. try:
  38. g.wait()
  39. except self.GreenletExit:
  40. entry.cancel()
  41. g.cancelled = True
  42. finally:
  43. self._queue.discard(g)
  44. def clear(self):
  45. queue = self._queue
  46. while queue:
  47. try:
  48. queue.pop().cancel()
  49. except (KeyError, self.GreenletExit):
  50. pass
  51. @property
  52. def queue(self):
  53. return [(g.eta, g.priority, g.entry) for g in self._queue]
  54. class Timer(timer2.Timer):
  55. Schedule = Schedule
  56. def ensure_started(self):
  57. pass
  58. def stop(self):
  59. self.schedule.clear()
  60. def cancel(self, tref):
  61. try:
  62. tref.cancel()
  63. except self.schedule.GreenletExit:
  64. pass
  65. def start(self):
  66. pass
  67. class TaskPool(base.BasePool):
  68. Timer = Timer
  69. rlimit_safe = False
  70. signal_safe = False
  71. is_green = True
  72. def __init__(self, *args, **kwargs):
  73. from eventlet import greenthread
  74. from eventlet.greenpool import GreenPool
  75. self.Pool = GreenPool
  76. self.getcurrent = greenthread.getcurrent
  77. self.getpid = lambda: id(greenthread.getcurrent())
  78. self.spawn_n = greenthread.spawn_n
  79. super(TaskPool, self).__init__(*args, **kwargs)
  80. def on_start(self):
  81. self._pool = self.Pool(self.limit)
  82. signals.eventlet_pool_started.send(sender=self)
  83. def on_stop(self):
  84. signals.eventlet_pool_preshutdown.send(sender=self)
  85. if self._pool is not None:
  86. self._pool.waitall()
  87. signals.eventlet_pool_postshutdown.send(sender=self)
  88. def on_apply(self, target, args=None, kwargs=None, callback=None,
  89. accept_callback=None, **_):
  90. signals.eventlet_pool_apply.send(sender=self,
  91. target=target, args=args, kwargs=kwargs)
  92. self._pool.spawn_n(apply_target, target, args, kwargs,
  93. callback, accept_callback,
  94. self.getpid)