amqp.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # -*- coding: utf-8 -*-
  2. """
  3. ``celery.backends.amqp``
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. The AMQP result backend.
  6. This backend publishes results as messages.
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. from kombu import Exchange, Queue, Producer, Consumer
  10. from kombu.utils import register_after_fork
  11. from celery import states
  12. from celery.five import range
  13. from celery.utils.functional import dictfilter
  14. from celery.utils.log import get_logger
  15. from celery.utils.timeutils import maybe_s_to_ms
  16. from . import base
  17. from .async import AsyncBackendMixin, BaseResultConsumer
  18. __all__ = ['BacklogLimitExceeded', 'AMQPBackend']
  19. logger = get_logger(__name__)
  20. class BacklogLimitExceeded(Exception):
  21. """Too much state history to fast-forward."""
  22. def repair_uuid(s):
  23. # Historically the dashes in UUIDS are removed from AMQ entity names,
  24. # but there is no known reason to. Hopefully we'll be able to fix
  25. # this in v4.0.
  26. return '%s-%s-%s-%s-%s' % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
  27. def _on_after_fork_cleanup_backend(backend):
  28. backend._after_fork()
  29. class NoCacheQueue(Queue):
  30. can_cache_declaration = False
  31. class ResultConsumer(BaseResultConsumer):
  32. Consumer = Consumer
  33. _connection = None
  34. _consumer = None
  35. def __init__(self, *args, **kwargs):
  36. super(ResultConsumer, self).__init__(*args, **kwargs)
  37. self._create_binding = self.backend._create_binding
  38. def start(self, initial_task_id, no_ack=True):
  39. self._connection = self.app.connection()
  40. initial_queue = self._create_binding(initial_task_id)
  41. self._consumer = self.Consumer(
  42. self._connection.default_channel, [initial_queue],
  43. callbacks=[self.on_state_change], no_ack=no_ack,
  44. accept=self.accept)
  45. self._consumer.consume()
  46. def drain_events(self, timeout=None):
  47. return self._connection.drain_events(timeout=timeout)
  48. def stop(self):
  49. try:
  50. self._consumer.cancel()
  51. finally:
  52. self._connection.close()
  53. def on_after_fork(self):
  54. self._consumer = None
  55. if self._connection is not None:
  56. self._connection.collect()
  57. self._connection = None
  58. def consume_from(self, task_id):
  59. if self._consumer is None:
  60. return self.start(task_id)
  61. queue = self._create_binding(task_id)
  62. if not self._consumer.consuming_from(queue):
  63. self._consumer.add_queue(queue)
  64. self._consumer.consume()
  65. def cancel_for(self, task_id):
  66. if self._consumer:
  67. self._consumer.cancel_by_queue(self._create_binding(task_id).name)
  68. class AMQPBackend(base.Backend, AsyncBackendMixin):
  69. """Publishes results by sending messages."""
  70. Exchange = Exchange
  71. Queue = NoCacheQueue
  72. Consumer = Consumer
  73. Producer = Producer
  74. ResultConsumer = ResultConsumer
  75. BacklogLimitExceeded = BacklogLimitExceeded
  76. persistent = True
  77. supports_autoexpire = True
  78. supports_native_join = True
  79. retry_policy = {
  80. 'max_retries': 20,
  81. 'interval_start': 0,
  82. 'interval_step': 1,
  83. 'interval_max': 1,
  84. }
  85. def __init__(self, app, connection=None, exchange=None, exchange_type=None,
  86. persistent=None, serializer=None, auto_delete=True, **kwargs):
  87. super(AMQPBackend, self).__init__(app, **kwargs)
  88. conf = self.app.conf
  89. self._connection = connection
  90. self._out_of_band = {}
  91. self.persistent = self.prepare_persistent(persistent)
  92. self.delivery_mode = 2 if self.persistent else 1
  93. exchange = exchange or conf.result_exchange
  94. exchange_type = exchange_type or conf.result_exchange_type
  95. self.exchange = self._create_exchange(
  96. exchange, exchange_type, self.delivery_mode,
  97. )
  98. self.serializer = serializer or conf.result_serializer
  99. self.auto_delete = auto_delete
  100. self.queue_arguments = dictfilter({
  101. 'x-expires': maybe_s_to_ms(self.expires),
  102. })
  103. self.result_consumer = self.ResultConsumer(
  104. self, self.app, self.accept, self._pending_results)
  105. if register_after_fork is not None:
  106. register_after_fork(self, _on_after_fork_cleanup_backend)
  107. def _after_fork(self):
  108. self._pending_results.clear()
  109. self.result_consumer._after_fork()
  110. def _create_exchange(self, name, type='direct', delivery_mode=2):
  111. return self.Exchange(name=name,
  112. type=type,
  113. delivery_mode=delivery_mode,
  114. durable=self.persistent,
  115. auto_delete=False)
  116. def _create_binding(self, task_id):
  117. name = self.rkey(task_id)
  118. return self.Queue(name=name,
  119. exchange=self.exchange,
  120. routing_key=name,
  121. durable=self.persistent,
  122. auto_delete=self.auto_delete,
  123. queue_arguments=self.queue_arguments)
  124. def revive(self, channel):
  125. pass
  126. def rkey(self, task_id):
  127. return task_id.replace('-', '')
  128. def destination_for(self, task_id, request):
  129. if request:
  130. return self.rkey(task_id), request.correlation_id or task_id
  131. return self.rkey(task_id), task_id
  132. def store_result(self, task_id, result, state,
  133. traceback=None, request=None, **kwargs):
  134. """Send task return value and state."""
  135. routing_key, correlation_id = self.destination_for(task_id, request)
  136. if not routing_key:
  137. return
  138. with self.app.amqp.producer_pool.acquire(block=True) as producer:
  139. producer.publish(
  140. {'task_id': task_id, 'status': state,
  141. 'result': self.encode_result(result, state),
  142. 'traceback': traceback,
  143. 'children': self.current_task_children(request)},
  144. exchange=self.exchange,
  145. routing_key=routing_key,
  146. correlation_id=correlation_id,
  147. serializer=self.serializer,
  148. retry=True, retry_policy=self.retry_policy,
  149. declare=self.on_reply_declare(task_id),
  150. delivery_mode=self.delivery_mode,
  151. )
  152. return result
  153. def on_reply_declare(self, task_id):
  154. return [self._create_binding(task_id)]
  155. def on_out_of_band_result(self, task_id, message):
  156. if self.result_consumer:
  157. self.result_consumer.on_out_of_band_result(message)
  158. self._out_of_band[task_id] = message
  159. def get_task_meta(self, task_id, backlog_limit=1000):
  160. try:
  161. buffered = self._out_of_band.pop(task_id)
  162. except KeyError:
  163. pass
  164. else:
  165. payload = self._cache[task_id] = self.meta_from_decoded(
  166. buffered.payload)
  167. return payload
  168. # Polling and using basic_get
  169. with self.app.pool.acquire_channel(block=True) as (_, channel):
  170. binding = self._create_binding(task_id)(channel)
  171. binding.declare()
  172. prev = latest = acc = None
  173. for i in range(backlog_limit): # spool ffwd
  174. acc = binding.get(
  175. accept=self.accept, no_ack=False,
  176. )
  177. if not acc: # no more messages
  178. break
  179. try:
  180. message_task_id = acc.properties['correlation_id']
  181. except (AttributeError, KeyError):
  182. message_task_id = acc.payload['task_id']
  183. if message_task_id == task_id:
  184. prev, latest = latest, acc
  185. if prev:
  186. # backends are not expected to keep history,
  187. # so we delete everything except the most recent state.
  188. prev.ack()
  189. prev = None
  190. else:
  191. self.on_out_of_band_result(message_task_id, acc)
  192. else:
  193. raise self.BacklogLimitExceeded(task_id)
  194. if latest:
  195. payload = self._cache[task_id] = self.meta_from_decoded(
  196. latest.payload)
  197. latest.requeue()
  198. return payload
  199. else:
  200. # no new state, use previous
  201. try:
  202. return self._cache[task_id]
  203. except KeyError:
  204. # result probably pending.
  205. return {'status': states.PENDING, 'result': None}
  206. poll = get_task_meta # XXX compat
  207. def reload_task_result(self, task_id):
  208. raise NotImplementedError(
  209. 'reload_task_result is not supported by this backend.')
  210. def reload_group_result(self, task_id):
  211. """Reload group result, even if it has been previously fetched."""
  212. raise NotImplementedError(
  213. 'reload_group_result is not supported by this backend.')
  214. def save_group(self, group_id, result):
  215. raise NotImplementedError(
  216. 'save_group is not supported by this backend.')
  217. def restore_group(self, group_id, cache=True):
  218. raise NotImplementedError(
  219. 'restore_group is not supported by this backend.')
  220. def delete_group(self, group_id):
  221. raise NotImplementedError(
  222. 'delete_group is not supported by this backend.')
  223. def as_uri(self, include_password=True):
  224. return 'amqp://'
  225. def __reduce__(self, args=(), kwargs={}):
  226. kwargs.update(
  227. connection=self._connection,
  228. exchange=self.exchange.name,
  229. exchange_type=self.exchange.type,
  230. persistent=self.persistent,
  231. serializer=self.serializer,
  232. auto_delete=self.auto_delete,
  233. expires=self.expires,
  234. )
  235. return super(AMQPBackend, self).__reduce__(args, kwargs)