rpc.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. # -*- coding: utf-8 -*-
  2. """The ``RPC`` result backend for AMQP brokers.
  3. RPC-style result backend, using reply-to and one queue per client.
  4. """
  5. from __future__ import absolute_import, unicode_literals
  6. import time
  7. import kombu
  8. from kombu.common import maybe_declare
  9. from kombu.utils.compat import register_after_fork
  10. from kombu.utils.objects import cached_property
  11. from celery import states
  12. from celery._state import current_task, task_join_will_block
  13. from celery.five import items, range
  14. from . import base
  15. from .async import AsyncBackendMixin, BaseResultConsumer
  16. __all__ = ('BacklogLimitExceeded', 'RPCBackend')
  17. E_NO_CHORD_SUPPORT = """
  18. The "rpc" result backend does not support chords!
  19. Note that a group chained with a task is also upgraded to be a chord,
  20. as this pattern requires synchronization.
  21. Result backends that supports chords: Redis, Database, Memcached, and more.
  22. """
  23. class BacklogLimitExceeded(Exception):
  24. """Too much state history to fast-forward."""
  25. def _on_after_fork_cleanup_backend(backend):
  26. backend._after_fork()
  27. class ResultConsumer(BaseResultConsumer):
  28. Consumer = kombu.Consumer
  29. _connection = None
  30. _consumer = None
  31. def __init__(self, *args, **kwargs):
  32. super(ResultConsumer, self).__init__(*args, **kwargs)
  33. self._create_binding = self.backend._create_binding
  34. def start(self, initial_task_id, no_ack=True, **kwargs):
  35. self._connection = self.app.connection()
  36. initial_queue = self._create_binding(initial_task_id)
  37. self._consumer = self.Consumer(
  38. self._connection.default_channel, [initial_queue],
  39. callbacks=[self.on_state_change], no_ack=no_ack,
  40. accept=self.accept)
  41. self._consumer.consume()
  42. def drain_events(self, timeout=None):
  43. if self._connection:
  44. return self._connection.drain_events(timeout=timeout)
  45. elif timeout:
  46. time.sleep(timeout)
  47. def stop(self):
  48. try:
  49. self._consumer.cancel()
  50. finally:
  51. self._connection.close()
  52. def on_after_fork(self):
  53. self._consumer = None
  54. if self._connection is not None:
  55. self._connection.collect()
  56. self._connection = None
  57. def consume_from(self, task_id):
  58. if self._consumer is None:
  59. return self.start(task_id)
  60. queue = self._create_binding(task_id)
  61. if not self._consumer.consuming_from(queue):
  62. self._consumer.add_queue(queue)
  63. self._consumer.consume()
  64. def cancel_for(self, task_id):
  65. if self._consumer:
  66. self._consumer.cancel_by_queue(self._create_binding(task_id).name)
  67. class RPCBackend(base.Backend, AsyncBackendMixin):
  68. """Base class for the RPC result backend."""
  69. Exchange = kombu.Exchange
  70. Producer = kombu.Producer
  71. ResultConsumer = ResultConsumer
  72. #: Exception raised when there are too many messages for a task id.
  73. BacklogLimitExceeded = BacklogLimitExceeded
  74. persistent = False
  75. supports_autoexpire = True
  76. supports_native_join = True
  77. retry_policy = {
  78. 'max_retries': 20,
  79. 'interval_start': 0,
  80. 'interval_step': 1,
  81. 'interval_max': 1,
  82. }
  83. class Consumer(kombu.Consumer):
  84. """Consumer that requires manual declaration of queues."""
  85. auto_declare = False
  86. class Queue(kombu.Queue):
  87. """Queue that never caches declaration."""
  88. can_cache_declaration = False
  89. def __init__(self, app, connection=None, exchange=None, exchange_type=None,
  90. persistent=None, serializer=None, auto_delete=True, **kwargs):
  91. super(RPCBackend, self).__init__(app, **kwargs)
  92. conf = self.app.conf
  93. self._connection = connection
  94. self._out_of_band = {}
  95. self.persistent = self.prepare_persistent(persistent)
  96. self.delivery_mode = 2 if self.persistent else 1
  97. exchange = exchange or conf.result_exchange
  98. exchange_type = exchange_type or conf.result_exchange_type
  99. self.exchange = self._create_exchange(
  100. exchange, exchange_type, self.delivery_mode,
  101. )
  102. self.serializer = serializer or conf.result_serializer
  103. self.auto_delete = auto_delete
  104. self.result_consumer = self.ResultConsumer(
  105. self, self.app, self.accept,
  106. self._pending_results, self._pending_messages,
  107. )
  108. if register_after_fork is not None:
  109. register_after_fork(self, _on_after_fork_cleanup_backend)
  110. def _after_fork(self):
  111. # clear state for child processes.
  112. self._pending_results.clear()
  113. self.result_consumer._after_fork()
  114. def _create_exchange(self, name, type='direct', delivery_mode=2):
  115. # uses direct to queue routing (anon exchange).
  116. return self.Exchange(None)
  117. def _create_binding(self, task_id):
  118. """Create new binding for task with id."""
  119. # RPC backend caches the binding, as one queue is used for all tasks.
  120. return self.binding
  121. def ensure_chords_allowed(self):
  122. raise NotImplementedError(E_NO_CHORD_SUPPORT.strip())
  123. def on_task_call(self, producer, task_id):
  124. # Called every time a task is sent when using this backend.
  125. # We declare the queue we receive replies on in advance of sending
  126. # the message, but we skip this if running in the prefork pool
  127. # (task_join_will_block), as we know the queue is already declared.
  128. if not task_join_will_block():
  129. maybe_declare(self.binding(producer.channel), retry=True)
  130. def destination_for(self, task_id, request):
  131. """Get the destination for result by task id.
  132. Returns:
  133. Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
  134. """
  135. # Backends didn't always receive the `request`, so we must still
  136. # support old code that relies on current_task.
  137. try:
  138. request = request or current_task.request
  139. except AttributeError:
  140. raise RuntimeError(
  141. 'RPC backend missing task request for {0!r}'.format(task_id))
  142. return request.reply_to, request.correlation_id or task_id
  143. def on_reply_declare(self, task_id):
  144. # Return value here is used as the `declare=` argument
  145. # for Producer.publish.
  146. # By default we don't have to declare anything when sending a result.
  147. pass
  148. def on_result_fulfilled(self, result):
  149. # This usually cancels the queue after the result is received,
  150. # but we don't have to cancel since we have one queue per process.
  151. pass
  152. def as_uri(self, include_password=True):
  153. return 'rpc://'
  154. def store_result(self, task_id, result, state,
  155. traceback=None, request=None, **kwargs):
  156. """Send task return value and state."""
  157. routing_key, correlation_id = self.destination_for(task_id, request)
  158. if not routing_key:
  159. return
  160. with self.app.amqp.producer_pool.acquire(block=True) as producer:
  161. producer.publish(
  162. self._to_result(task_id, state, result, traceback, request),
  163. exchange=self.exchange,
  164. routing_key=routing_key,
  165. correlation_id=correlation_id,
  166. serializer=self.serializer,
  167. retry=True, retry_policy=self.retry_policy,
  168. declare=self.on_reply_declare(task_id),
  169. delivery_mode=self.delivery_mode,
  170. )
  171. return result
  172. def _to_result(self, task_id, state, result, traceback, request):
  173. return {
  174. 'task_id': task_id,
  175. 'status': state,
  176. 'result': self.encode_result(result, state),
  177. 'traceback': traceback,
  178. 'children': self.current_task_children(request),
  179. }
  180. def on_out_of_band_result(self, task_id, message):
  181. # Callback called when a reply for a task is received,
  182. # but we have no idea what do do with it.
  183. # Since the result is not pending, we put it in a separate
  184. # buffer: probably it will become pending later.
  185. if self.result_consumer:
  186. self.result_consumer.on_out_of_band_result(message)
  187. self._out_of_band[task_id] = message
  188. def get_task_meta(self, task_id, backlog_limit=1000):
  189. buffered = self._out_of_band.pop(task_id, None)
  190. if buffered:
  191. return self._set_cache_by_message(task_id, buffered)
  192. # Polling and using basic_get
  193. latest_by_id = {}
  194. prev = None
  195. for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):
  196. tid = self._get_message_task_id(acc)
  197. prev, latest_by_id[tid] = latest_by_id.get(tid), acc
  198. if prev:
  199. # backends aren't expected to keep history,
  200. # so we delete everything except the most recent state.
  201. prev.ack()
  202. prev = None
  203. latest = latest_by_id.pop(task_id, None)
  204. for tid, msg in items(latest_by_id):
  205. self.on_out_of_band_result(tid, msg)
  206. if latest:
  207. latest.requeue()
  208. return self._set_cache_by_message(task_id, latest)
  209. else:
  210. # no new state, use previous
  211. try:
  212. return self._cache[task_id]
  213. except KeyError:
  214. # result probably pending.
  215. return {'status': states.PENDING, 'result': None}
  216. poll = get_task_meta # XXX compat
  217. def _set_cache_by_message(self, task_id, message):
  218. payload = self._cache[task_id] = self.meta_from_decoded(
  219. message.payload)
  220. return payload
  221. def _slurp_from_queue(self, task_id, accept,
  222. limit=1000, no_ack=False):
  223. with self.app.pool.acquire_channel(block=True) as (_, channel):
  224. binding = self._create_binding(task_id)(channel)
  225. binding.declare()
  226. for _ in range(limit):
  227. msg = binding.get(accept=accept, no_ack=no_ack)
  228. if not msg:
  229. break
  230. yield msg
  231. else:
  232. raise self.BacklogLimitExceeded(task_id)
  233. def _get_message_task_id(self, message):
  234. try:
  235. # try property first so we don't have to deserialize
  236. # the payload.
  237. return message.properties['correlation_id']
  238. except (AttributeError, KeyError):
  239. # message sent by old Celery version, need to deserialize.
  240. return message.payload['task_id']
  241. def revive(self, channel):
  242. pass
  243. def reload_task_result(self, task_id):
  244. raise NotImplementedError(
  245. 'reload_task_result is not supported by this backend.')
  246. def reload_group_result(self, task_id):
  247. """Reload group result, even if it has been previously fetched."""
  248. raise NotImplementedError(
  249. 'reload_group_result is not supported by this backend.')
  250. def save_group(self, group_id, result):
  251. raise NotImplementedError(
  252. 'save_group is not supported by this backend.')
  253. def restore_group(self, group_id, cache=True):
  254. raise NotImplementedError(
  255. 'restore_group is not supported by this backend.')
  256. def delete_group(self, group_id):
  257. raise NotImplementedError(
  258. 'delete_group is not supported by this backend.')
  259. def __reduce__(self, args=(), kwargs={}):
  260. return super(RPCBackend, self).__reduce__(args, dict(
  261. kwargs,
  262. connection=self._connection,
  263. exchange=self.exchange.name,
  264. exchange_type=self.exchange.type,
  265. persistent=self.persistent,
  266. serializer=self.serializer,
  267. auto_delete=self.auto_delete,
  268. expires=self.expires,
  269. ))
  270. @property
  271. def binding(self):
  272. return self.Queue(
  273. self.oid, self.exchange, self.oid,
  274. durable=False,
  275. auto_delete=True,
  276. expires=self.expires,
  277. )
  278. @cached_property
  279. def oid(self):
  280. # cached here is the app OID: name of queue we receive results on.
  281. return self.app.oid