async.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. """Async I/O backend support utilities."""
  2. import socket
  3. from collections import deque
  4. from time import monotonic, sleep
  5. from weakref import WeakKeyDictionary
  6. from queue import Empty
  7. from kombu.syn import detect_environment
  8. from kombu.utils.objects import cached_property
  9. from celery import states
  10. from celery.exceptions import TimeoutError
  11. drainers = {}
  12. def register_drainer(name):
  13. def _inner(cls):
  14. drainers[name] = cls
  15. return cls
  16. return _inner
  17. @register_drainer('default')
  18. class Drainer:
  19. def __init__(self, result_consumer):
  20. self.result_consumer = result_consumer
  21. def drain_events_until(self, p, timeout=None, on_interval=None,
  22. monotonic=monotonic, wait=None):
  23. wait = wait or self.result_consumer.drain_events
  24. time_start = monotonic()
  25. while 1:
  26. # Total time spent may exceed a single call to wait()
  27. if timeout and monotonic() - time_start >= timeout:
  28. raise socket.timeout()
  29. try:
  30. yield self.wait_for(p, wait, timeout=1)
  31. except socket.timeout:
  32. pass
  33. if on_interval:
  34. on_interval()
  35. if p.ready: # got event on the wanted channel.
  36. break
  37. def wait_for(self, p, wait, timeout=None):
  38. wait(timeout=timeout)
  39. class greenletDrainer(Drainer):
  40. spawn = None
  41. _g = None
  42. _stopped = False
  43. def run(self):
  44. while not self._stopped:
  45. try:
  46. self.result_consumer.drain_events(timeout=1)
  47. except socket.timeout:
  48. pass
  49. def start(self):
  50. if self._g is None:
  51. self._g = self.spawn(self.run)
  52. def stop(self):
  53. self._stopped = True
  54. def wait_for(self, p, wait, timeout=None):
  55. if self._g is None:
  56. self.start()
  57. if not p.ready:
  58. sleep(0)
  59. @register_drainer('eventlet')
  60. class eventletDrainer(greenletDrainer):
  61. @cached_property
  62. def spawn(self):
  63. from eventlet import spawn
  64. return spawn
  65. @register_drainer('gevent')
  66. class geventDrainer(greenletDrainer):
  67. @cached_property
  68. def spawn(self):
  69. from gevent import spawn
  70. return spawn
  71. class AsyncBackendMixin:
  72. def _collect_into(self, result, bucket):
  73. self.result_consumer.buckets[result] = bucket
  74. def iter_native(self, result, no_ack=True, **kwargs):
  75. self._ensure_not_eager()
  76. results = result.results
  77. if not results:
  78. raise StopIteration()
  79. bucket = deque()
  80. for node in results:
  81. if node._cache:
  82. bucket.append(node)
  83. else:
  84. self._collect_into(node, bucket)
  85. for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
  86. while bucket:
  87. node = bucket.popleft()
  88. yield node.id, node._cache
  89. while bucket:
  90. node = bucket.popleft()
  91. yield node.id, node._cache
  92. def add_pending_result(self, result, weak=False):
  93. try:
  94. self._maybe_resolve_from_buffer(result)
  95. except Empty:
  96. self._add_pending_result(result.id, result, weak=weak)
  97. return result
  98. def _maybe_resolve_from_buffer(self, result):
  99. result._maybe_set_cache(self._pending_messages.take(result.id))
  100. def _add_pending_result(self, task_id, result, weak=False):
  101. weak, concrete = self._pending_results
  102. if task_id not in weak and result.id not in concrete:
  103. (weak if weak else concrete)[task_id] = result
  104. self.result_consumer.consume_from(task_id)
  105. def add_pending_results(self, results, weak=False):
  106. return [self.add_pending_result(result, weak=weak)
  107. for result in results]
  108. def remove_pending_result(self, result):
  109. self._remove_pending_result(result.id)
  110. self.on_result_fulfilled(result)
  111. return result
  112. def _remove_pending_result(self, task_id):
  113. for map in self._pending_results:
  114. map.pop(task_id, None)
  115. def on_result_fulfilled(self, result):
  116. self.result_consumer.cancel_for(result.id)
  117. def wait_for_pending(self, result,
  118. callback=None, propagate=True, **kwargs):
  119. self._ensure_not_eager()
  120. for _ in self._wait_for_pending(result, **kwargs):
  121. pass
  122. return result.maybe_throw(callback=callback, propagate=propagate)
  123. def _wait_for_pending(self, result,
  124. timeout=None, on_interval=None, on_message=None,
  125. **kwargs):
  126. return self.result_consumer._wait_for_pending(
  127. result, timeout=timeout,
  128. on_interval=on_interval, on_message=on_message,
  129. )
  130. @property
  131. def is_async(self):
  132. return True
  133. class BaseResultConsumer:
  134. def __init__(self, backend, app, accept,
  135. pending_results, pending_messages):
  136. self.backend = backend
  137. self.app = app
  138. self.accept = accept
  139. self._pending_results = pending_results
  140. self._pending_messages = pending_messages
  141. self.on_message = None
  142. self.buckets = WeakKeyDictionary()
  143. self.drainer = drainers[detect_environment()](self)
  144. def start(self):
  145. raise NotImplementedError()
  146. def stop(self):
  147. pass
  148. def drain_events(self, timeout=None):
  149. raise NotImplementedError()
  150. def consume_from(self, task_id):
  151. raise NotImplementedError()
  152. def cancel_for(self, task_id):
  153. raise NotImplementedError()
  154. def _after_fork(self):
  155. self.buckets.clear()
  156. self.buckets = WeakKeyDictionary()
  157. self.on_message = None
  158. self.on_after_fork()
  159. def on_after_fork(self):
  160. pass
  161. def drain_events_until(self, p, timeout=None, on_interval=None):
  162. return self.drainer.drain_events_until(
  163. p, timeout=timeout, on_interval=on_interval)
  164. def _wait_for_pending(self, result,
  165. timeout=None, on_interval=None, on_message=None,
  166. **kwargs):
  167. self.on_wait_for_pending(result, timeout=timeout, **kwargs)
  168. prev_on_m, self.on_message = self.on_message, on_message
  169. try:
  170. for _ in self.drain_events_until(
  171. result.on_ready, timeout=timeout,
  172. on_interval=on_interval):
  173. yield
  174. sleep(0)
  175. except socket.timeout:
  176. raise TimeoutError('The operation timed out.')
  177. finally:
  178. self.on_message = prev_on_m
  179. def on_wait_for_pending(self, result, timeout=None, **kwargs):
  180. pass
  181. def on_out_of_band_result(self, message):
  182. self.on_state_change(message.payload, message)
  183. def _get_pending_result(self, task_id):
  184. for mapping in self._pending_results:
  185. try:
  186. return mapping[task_id]
  187. except KeyError:
  188. pass
  189. raise KeyError(task_id)
  190. def on_state_change(self, meta, message):
  191. if self.on_message:
  192. self.on_message(meta)
  193. if meta['status'] in states.READY_STATES:
  194. task_id = meta['task_id']
  195. try:
  196. result = self._get_pending_result(task_id)
  197. except KeyError:
  198. # send to buffer in case we received this result
  199. # before it was added to _pending_results.
  200. self._pending_messages.put(task_id, meta)
  201. else:
  202. result._maybe_set_cache(meta)
  203. buckets = self.buckets
  204. try:
  205. buckets.pop(result)
  206. except KeyError:
  207. pass
  208. sleep(0)