async.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """
  2. ``celery.backends.async``
  3. ~~~~~~~~~~~~~~~~~~~~~~~~~
  4. Async backend support utilities.
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. import socket
  8. from collections import deque
  9. from time import sleep
  10. from weakref import WeakKeyDictionary
  11. from kombu.syn import detect_environment
  12. from kombu.utils import cached_property
  13. from celery import states
  14. from celery.exceptions import TimeoutError
  15. from celery.five import monotonic
  16. drainers = {}
  17. def register_drainer(name):
  18. def _inner(cls):
  19. drainers[name] = cls
  20. return cls
  21. return _inner
  22. @register_drainer('default')
  23. class Drainer(object):
  24. def __init__(self, result_consumer):
  25. self.result_consumer = result_consumer
  26. def drain_events_until(self, p, timeout=None, on_interval=None,
  27. monotonic=monotonic, wait=None):
  28. wait = wait or self.result_consumer.drain_events
  29. time_start = monotonic()
  30. while 1:
  31. # Total time spent may exceed a single call to wait()
  32. if timeout and monotonic() - time_start >= timeout:
  33. raise socket.timeout()
  34. try:
  35. yield self.wait_for(p, wait, timeout=1)
  36. except socket.timeout:
  37. pass
  38. if on_interval:
  39. on_interval()
  40. if p.ready: # got event on the wanted channel.
  41. break
  42. def wait_for(self, p, wait, timeout=None):
  43. wait(timeout=timeout)
  44. class greenletDrainer(Drainer):
  45. spawn = None
  46. _g = None
  47. _stopped = False
  48. def run(self):
  49. while not self._stopped:
  50. try:
  51. self.result_consumer.drain_events(timeout=1)
  52. except socket.timeout:
  53. pass
  54. def start(self):
  55. if self._g is None:
  56. self._g = self.spawn(self.run)
  57. def stop(self):
  58. self._stopped = True
  59. def wait_for(self, p, wait, timeout=None):
  60. if self._g is None:
  61. self.start()
  62. if not p.ready:
  63. sleep(0)
  64. @register_drainer('eventlet')
  65. class eventletDrainer(greenletDrainer):
  66. @cached_property
  67. def spawn(self):
  68. from eventlet import spawn
  69. return spawn
  70. @register_drainer('gevent')
  71. class geventDrainer(greenletDrainer):
  72. @cached_property
  73. def spawn(self):
  74. from gevent import spawn
  75. return spawn
  76. class AsyncBackendMixin(object):
  77. def _collect_into(self, result, bucket):
  78. self.result_consumer.buckets[result] = bucket
  79. def iter_native(self, result, no_ack=True, **kwargs):
  80. self._ensure_not_eager()
  81. results = result.results
  82. if not results:
  83. raise StopIteration()
  84. bucket = deque()
  85. for node in results:
  86. if node._cache:
  87. bucket.append(node)
  88. else:
  89. self._collect_into(node, bucket)
  90. for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
  91. while bucket:
  92. node = bucket.popleft()
  93. yield node.id, node._cache
  94. while bucket:
  95. node = bucket.popleft()
  96. yield node.id, node._cache
  97. def add_pending_result(self, result, weak=False):
  98. if weak:
  99. dest, alt = self._weak_pending_results, self._pending_results
  100. else:
  101. dest, alt = self._pending_results, self._weak_pending_results
  102. if result.id not in dest and result.id not in alt:
  103. dest[result.id] = result
  104. self.result_consumer.consume_from(result.id)
  105. return result
  106. def add_pending_results(self, results, weak=False):
  107. return [self.add_pending_result(result, weak=weak)
  108. for result in results]
  109. def remove_pending_result(self, result):
  110. self._pending_results.pop(result.id, None)
  111. self._weak_pending_results.pop(result.id, None)
  112. self.on_result_fulfilled(result)
  113. return result
  114. def on_result_fulfilled(self, result):
  115. self.result_consumer.cancel_for(result.id)
  116. def wait_for_pending(self, result,
  117. callback=None, propagate=True, **kwargs):
  118. self._ensure_not_eager()
  119. for _ in self._wait_for_pending(result, **kwargs):
  120. pass
  121. return result.maybe_throw(callback=callback, propagate=propagate)
  122. def _wait_for_pending(self, result,
  123. timeout=None, on_interval=None, on_message=None,
  124. **kwargs):
  125. return self.result_consumer._wait_for_pending(
  126. result, timeout=timeout,
  127. on_interval=on_interval, on_message=on_message,
  128. )
  129. @property
  130. def is_async(self):
  131. return True
  132. class BaseResultConsumer(object):
  133. def __init__(self, backend, app, accept, pending_results,
  134. weak_pending_results):
  135. self.backend = backend
  136. self.app = app
  137. self.accept = accept
  138. self._pending_results = pending_results
  139. self._weak_pending_results = weak_pending_results
  140. self.on_message = None
  141. self.buckets = WeakKeyDictionary()
  142. self.drainer = drainers[detect_environment()](self)
  143. def start(self):
  144. raise NotImplementedError()
  145. def stop(self):
  146. pass
  147. def drain_events(self, timeout=None):
  148. raise NotImplementedError()
  149. def consume_from(self, task_id):
  150. raise NotImplementedError()
  151. def cancel_for(self, task_id):
  152. raise NotImplementedError()
  153. def _after_fork(self):
  154. self.buckets.clear()
  155. self.buckets = WeakKeyDictionary()
  156. self.on_message = None
  157. self.on_after_fork()
  158. def on_after_fork(self):
  159. pass
  160. def drain_events_until(self, p, timeout=None, on_interval=None):
  161. return self.drainer.drain_events_until(
  162. p, timeout=timeout, on_interval=on_interval)
  163. def _wait_for_pending(self, result,
  164. timeout=None, on_interval=None, on_message=None,
  165. **kwargs):
  166. self.on_wait_for_pending(result, timeout=timeout, **kwargs)
  167. prev_on_m, self.on_message = self.on_message, on_message
  168. try:
  169. for _ in self.drain_events_until(
  170. result.on_ready, timeout=timeout,
  171. on_interval=on_interval):
  172. yield
  173. sleep(0)
  174. except socket.timeout:
  175. raise TimeoutError('The operation timed out.')
  176. finally:
  177. self.on_message = prev_on_m
  178. def on_wait_for_pending(self, result, timeout=None, **kwargs):
  179. pass
  180. def on_out_of_band_result(self, message):
  181. self.on_state_change(message.payload, message)
  182. def on_state_change(self, meta, message):
  183. if self.on_message:
  184. self.on_message(meta)
  185. if meta['status'] in states.READY_STATES:
  186. try:
  187. result = self._pending_results[meta['task_id']]
  188. except KeyError:
  189. try:
  190. result = self._weak_pending_results[meta['task_id']]
  191. except KeyError:
  192. return
  193. result._maybe_set_cache(meta)
  194. buckets = self.buckets
  195. try:
  196. buckets.pop(result)
  197. except KeyError:
  198. pass
  199. sleep(0)