amqp.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. """celery.backends.amqp"""
  2. import socket
  3. import time
  4. import warnings
  5. from datetime import timedelta
  6. from carrot.messaging import Consumer, Publisher
  7. from celery import states
  8. from celery.backends.base import BaseDictBackend
  9. from celery.exceptions import TimeoutError
  10. from celery.utils import timeutils
  11. class AMQResultWarning(UserWarning):
  12. pass
  13. class ResultPublisher(Publisher):
  14. exchange = conf.RESULT_EXCHANGE
  15. exchange_type = conf.RESULT_EXCHANGE_TYPE
  16. delivery_mode = conf.RESULT_PERSISTENT and 2 or 1
  17. serializer = conf.RESULT_SERIALIZER
  18. durable = conf.RESULT_PERSISTENT
  19. auto_delete = True
  20. def __init__(self, connection, task_id, **kwargs):
  21. super(ResultPublisher, self).__init__(connection,
  22. routing_key=task_id.replace("-", ""),
  23. **kwargs)
  24. class ResultConsumer(Consumer):
  25. no_ack = True
  26. auto_delete = True
  27. def __init__(self, connection, task_id, **kwargs):
  28. routing_key = task_id.replace("-", "")
  29. super(ResultConsumer, self).__init__(connection,
  30. queue=routing_key, routing_key=routing_key, **kwargs)
  31. class AMQPBackend(BaseDictBackend):
  32. """AMQP backend. Publish results by sending messages to the broker
  33. using the task id as routing key.
  34. **NOTE:** Results published using this backend is read-once only.
  35. After the result has been read, the result is deleted. (however, it's
  36. still cached locally by the backend instance).
  37. """
  38. _connection = None
  39. def __init__(self, connection=None, exchange=None, exchange_type=None,
  40. persistent=None, serializer=None, auto_delete=False,
  41. expires=None, **kwargs):
  42. super(AMQPBackend, self).__init__(**kwargs)
  43. conf = self.app.conf
  44. self._connection = connection
  45. self.queue_arguments = {}
  46. self.exchange = exchange or conf.CELERY_RESULT_EXCHANGE
  47. self.exchange_type = exchange_type or conf.CELERY_RESULT_EXCHANGE_TYPE
  48. if persistent is None:
  49. persistent = conf.CELERY_RESULT_PERSISTENT
  50. self.persistent = persistent
  51. self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
  52. self.auto_delete = auto_delete
  53. self.expires = expires
  54. if self.expires is None:
  55. self.expires = conf.CELERY_AMQP_TASK_RESULT_EXPIRES
  56. if isinstance(self.expires, timedelta):
  57. self.expires = timeutils.timedelta_seconds(self.expires)
  58. if self.expires is not None:
  59. self.expires = int(self.expires)
  60. # WARNING: Requires RabbitMQ 2.1.0 or higher.
  61. # x-expires must be a signed-int, or long describing
  62. # the expiry time in milliseconds.
  63. self.queue_arguments["x-expires"] = int(self.expires * 1000.0)
  64. def _create_publisher(self, task_id, connection):
  65. delivery_mode = self.persistent and 2 or 1
  66. # Declares the queue.
  67. self._create_consumer(task_id, connection).close()
  68. return ResultPublisher(connection, task_id,
  69. exchange=self.exchange,
  70. exchange_type=self.exchange_type,
  71. delivery_mode=delivery_mode,
  72. durable=self.persistent,
  73. serializer=self.serializer,
  74. auto_delete=self.auto_delete)
  75. def _create_consumer(self, task_id, connection):
  76. return ResultConsumer(connection, task_id,
  77. exchange=self.exchange,
  78. exchange_type=self.exchange_type,
  79. durable=self.persistent,
  80. auto_delete=self.auto_delete,
  81. queue_arguments=self.queue_arguments)
  82. def store_result(self, task_id, result, status, traceback=None,
  83. max_retries=20, retry_delay=0.2):
  84. """Send task return value and status."""
  85. result = self.encode_result(result, status)
  86. meta = {"task_id": task_id,
  87. "result": result,
  88. "status": status,
  89. "traceback": traceback}
  90. for i in range(max_retries + 1):
  91. try:
  92. publisher = self._create_publisher(task_id, self.connection)
  93. publisher.send(meta)
  94. publisher.close()
  95. except Exception, exc:
  96. if not max_retries:
  97. raise
  98. self._connection = None
  99. warnings.warn(AMQResultWarning(
  100. "Error sending result %s: %r" % (task_id, exc)))
  101. time.sleep(retry_delay)
  102. break
  103. return result
  104. def get_task_meta(self, task_id, cache=True):
  105. return self.poll(task_id)
  106. def wait_for(self, task_id, timeout=None, cache=True):
  107. cached_meta = self._cache.get(task_id)
  108. if cached_meta and cached_meta["status"] in states.READY_STATES:
  109. meta = cached_meta
  110. else:
  111. try:
  112. meta = self.consume(task_id, timeout=timeout)
  113. except socket.timeout:
  114. raise TimeoutError("The operation timed out.")
  115. if meta["status"] == states.SUCCESS:
  116. return meta["result"]
  117. elif meta["status"] in states.PROPAGATE_STATES:
  118. raise self.exception_to_python(meta["result"])
  119. else:
  120. return self.wait_for(task_id, timeout, cache)
  121. def poll(self, task_id):
  122. consumer = self._create_consumer(task_id, self.connection)
  123. result = consumer.fetch()
  124. try:
  125. if result:
  126. payload = self._cache[task_id] = result.payload
  127. return payload
  128. else:
  129. # Use previously received status if any.
  130. if task_id in self._cache:
  131. return self._cache[task_id]
  132. return {"status": states.PENDING, "result": None}
  133. finally:
  134. consumer.close()
  135. def consume(self, task_id, timeout=None):
  136. results = []
  137. def callback(meta, message):
  138. if meta["status"] in states.READY_STATES:
  139. results.append(meta)
  140. wait = self.connection.drain_events
  141. consumer = self._create_consumer(task_id, self.connection)
  142. consumer.register_callback(callback)
  143. consumer.consume()
  144. try:
  145. time_start = time.time()
  146. while True:
  147. # Total time spent may exceed a single call to wait()
  148. if timeout and time.time() - time_start >= timeout:
  149. raise socket.timeout()
  150. wait(timeout=timeout)
  151. if results:
  152. # Got event on the wanted channel.
  153. break
  154. finally:
  155. consumer.close()
  156. self._cache[task_id] = results[0]
  157. return results[0]
  158. def close(self):
  159. if self._connection is not None:
  160. self._connection.close()
  161. @property
  162. def connection(self):
  163. if not self._connection:
  164. self._connection = self.app.broker_connection()
  165. return self._connection
  166. def reload_task_result(self, task_id):
  167. raise NotImplementedError(
  168. "reload_task_result is not supported by this backend.")
  169. def reload_taskset_result(self, task_id):
  170. """Reload taskset result, even if it has been previously fetched."""
  171. raise NotImplementedError(
  172. "reload_taskset_result is not supported by this backend.")
  173. def save_taskset(self, taskset_id, result):
  174. """Store the result and status of a task."""
  175. raise NotImplementedError(
  176. "save_taskset is not supported by this backend.")
  177. def restore_taskset(self, taskset_id, cache=True):
  178. """Get the result of a taskset."""
  179. raise NotImplementedError(
  180. "restore_taskset is not supported by this backend.")