amqp.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # -*- coding: utf-8 -*-
  2. import socket
  3. import time
  4. from datetime import timedelta
  5. from kombu.entity import Exchange, Queue
  6. from kombu.messaging import Consumer, Producer
  7. from celery import states
  8. from celery.backends.base import BaseDictBackend
  9. from celery.exceptions import TimeoutError
  10. from celery.utils import timeutils
  11. from celery.utils import cached_property
  12. def repair_uuid(s):
  13. # Historically the dashes in UUIDS are removed from AMQ entity names,
  14. # but there is no known reason to. Hopefully we'll be able to fix
  15. # this in v3.0.
  16. return "%s-%s-%s-%s-%s" % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
  17. class AMQPBackend(BaseDictBackend):
  18. """Publishes results by sending messages."""
  19. Exchange = Exchange
  20. Queue = Queue
  21. Consumer = Consumer
  22. Producer = Producer
  23. _pool = None
  24. def __init__(self, connection=None, exchange=None, exchange_type=None,
  25. persistent=None, serializer=None, auto_delete=True,
  26. expires=None, connection_max=None, **kwargs):
  27. super(AMQPBackend, self).__init__(**kwargs)
  28. conf = self.app.conf
  29. self._connection = connection
  30. self.queue_arguments = {}
  31. if persistent is None:
  32. persistent = conf.CELERY_RESULT_PERSISTENT
  33. self.persistent = persistent
  34. delivery_mode = persistent and "persistent" or "transient"
  35. exchange = exchange or conf.CELERY_RESULT_EXCHANGE
  36. exchange_type = exchange_type or conf.CELERY_RESULT_EXCHANGE_TYPE
  37. self.exchange = self.Exchange(name=exchange,
  38. type=exchange_type,
  39. delivery_mode=delivery_mode,
  40. durable=self.persistent,
  41. auto_delete=auto_delete)
  42. self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
  43. self.auto_delete = auto_delete
  44. self.expires = expires
  45. if self.expires is None:
  46. self.expires = conf.CELERY_AMQP_TASK_RESULT_EXPIRES
  47. if isinstance(self.expires, timedelta):
  48. self.expires = timeutils.timedelta_seconds(self.expires)
  49. if self.expires is not None:
  50. self.expires = int(self.expires)
  51. # requires RabbitMQ 2.1.0 or higher.
  52. self.queue_arguments["x-expires"] = int(self.expires * 1000.0)
  53. self.connection_max = (connection_max or
  54. conf.CELERY_AMQP_TASK_RESULT_CONNECTION_MAX)
  55. def _create_binding(self, task_id):
  56. name = task_id.replace("-", "")
  57. return self.Queue(name=name,
  58. exchange=self.exchange,
  59. routing_key=name,
  60. durable=self.persistent,
  61. auto_delete=self.auto_delete,
  62. queue_arguments=self.queue_arguments)
  63. def _create_producer(self, task_id, channel):
  64. self._create_binding(task_id)(channel).declare()
  65. return self.Producer(channel, exchange=self.exchange,
  66. routing_key=task_id.replace("-", ""),
  67. serializer=self.serializer)
  68. def _create_consumer(self, bindings, channel):
  69. return self.Consumer(channel, bindings, no_ack=True)
  70. def _publish_result(self, connection, task_id, meta):
  71. # cache single channel
  72. if hasattr(connection, "_result_producer_chan") and \
  73. connection._result_producer_chan is not None and \
  74. connection._result_producer_chan.connection is not None:
  75. channel = connection._result_producer_chan
  76. else:
  77. channel = connection._result_producer_chan = connection.channel()
  78. try:
  79. self._create_producer(task_id, channel).publish(meta)
  80. finally:
  81. channel.close()
  82. def revive(self, channel):
  83. pass
  84. def _store_result(self, task_id, result, status, traceback=None,
  85. max_retries=20, interval_start=0, interval_step=1,
  86. interval_max=1):
  87. """Send task return value and status."""
  88. conn = self.pool.acquire(block=True)
  89. try:
  90. send = conn.ensure(self, self._publish_result,
  91. max_retries=max_retries,
  92. interval_start=interval_start,
  93. interval_step=interval_step,
  94. interval_max=interval_max)
  95. send(conn, task_id, {"task_id": task_id, "status": status,
  96. "result": self.encode_result(result, status),
  97. "traceback": traceback})
  98. finally:
  99. conn.release()
  100. return result
  101. def get_task_meta(self, task_id, cache=True):
  102. if cache and task_id in self._cache:
  103. return self._cache[task_id]
  104. return self.poll(task_id)
  105. def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
  106. **kwargs):
  107. cached_meta = self._cache.get(task_id)
  108. if cache and cached_meta and \
  109. cached_meta["status"] in states.READY_STATES:
  110. meta = cached_meta
  111. else:
  112. try:
  113. meta = self.consume(task_id, timeout=timeout)
  114. except socket.timeout:
  115. raise TimeoutError("The operation timed out.")
  116. state = meta["status"]
  117. if state == states.SUCCESS:
  118. return meta["result"]
  119. elif state in states.PROPAGATE_STATES:
  120. if propagate:
  121. raise self.exception_to_python(meta["result"])
  122. return meta["result"]
  123. else:
  124. return self.wait_for(task_id, timeout, cache)
  125. def poll(self, task_id):
  126. conn = self.pool.acquire(block=True)
  127. channel = conn.channel()
  128. try:
  129. binding = self._create_binding(task_id)(channel)
  130. binding.declare()
  131. result = binding.get()
  132. if result:
  133. payload = self._cache[task_id] = result.payload
  134. return payload
  135. elif task_id in self._cache: # use previously received state.
  136. return self._cache[task_id]
  137. return {"status": states.PENDING, "result": None}
  138. finally:
  139. channel.close()
  140. conn.release()
  141. def drain_events(self, connection, consumer, timeout=None, now=time.time):
  142. wait = connection.drain_events
  143. results = {}
  144. def callback(meta, message):
  145. if meta["status"] in states.READY_STATES:
  146. uuid = repair_uuid(message.delivery_info["routing_key"])
  147. results[uuid] = meta
  148. consumer.register_callback(callback)
  149. time_start = now()
  150. while 1:
  151. # Total time spent may exceed a single call to wait()
  152. if timeout and now() - time_start >= timeout:
  153. raise socket.timeout()
  154. wait(timeout=timeout)
  155. if results: # got event on the wanted channel.
  156. break
  157. self._cache.update(results)
  158. return results
  159. def consume(self, task_id, timeout=None):
  160. conn = self.pool.acquire(block=True)
  161. channel = conn.channel()
  162. try:
  163. binding = self._create_binding(task_id)
  164. consumer = self._create_consumer(binding, channel)
  165. consumer.consume()
  166. try:
  167. return self.drain_events(conn, consumer, timeout).values()[0]
  168. finally:
  169. consumer.cancel()
  170. finally:
  171. channel.close()
  172. conn.release()
  173. def get_many(self, task_ids, timeout=None):
  174. conn = self.pool.acquire(block=True)
  175. channel = conn.channel()
  176. try:
  177. ids = set(task_ids)
  178. cached_ids = set()
  179. for task_id in ids:
  180. try:
  181. cached = self._cache[task_id]
  182. except KeyError:
  183. pass
  184. else:
  185. if cached["status"] in states.READY_STATES:
  186. yield task_id, cached
  187. cached_ids.add(task_id)
  188. ids ^= cached_ids
  189. bindings = [self._create_binding(task_id) for task_id in task_ids]
  190. consumer = self._create_consumer(bindings, channel)
  191. consumer.consume()
  192. try:
  193. while ids:
  194. r = self.drain_events(conn, consumer, timeout)
  195. ids ^= set(r.keys())
  196. for ready_id, ready_meta in r.items():
  197. yield ready_id, ready_meta
  198. except: # ☹ Py2.4 — Cannot yield inside try: finally: block
  199. consumer.cancel()
  200. raise
  201. consumer.cancel()
  202. except: # … ☹
  203. channel.close()
  204. conn.release()
  205. raise
  206. channel.close()
  207. conn.release()
  208. def reload_task_result(self, task_id):
  209. raise NotImplementedError(
  210. "reload_task_result is not supported by this backend.")
  211. def reload_taskset_result(self, task_id):
  212. """Reload taskset result, even if it has been previously fetched."""
  213. raise NotImplementedError(
  214. "reload_taskset_result is not supported by this backend.")
  215. def save_taskset(self, taskset_id, result):
  216. """Store the result and status of a task."""
  217. raise NotImplementedError(
  218. "save_taskset is not supported by this backend.")
  219. def restore_taskset(self, taskset_id, cache=True):
  220. """Get the result of a taskset."""
  221. raise NotImplementedError(
  222. "restore_taskset is not supported by this backend.")
  223. @cached_property
  224. def pool(self):
  225. return self.app.broker_connection().Pool(self.connection_max)