amqp.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """celery.backends.amqp"""
  2. import socket
  3. import time
  4. import warnings
  5. from datetime import timedelta
  6. from carrot.messaging import Consumer, Publisher
  7. from celery import states
  8. from celery.backends.base import BaseDictBackend
  9. from celery.exceptions import TimeoutError
  10. from celery.utils import timeutils
  11. class AMQResultWarning(UserWarning):
  12. pass
  13. class ResultPublisher(Publisher):
  14. auto_delete = True
  15. def __init__(self, connection, task_id, **kwargs):
  16. super(ResultPublisher, self).__init__(connection,
  17. routing_key=task_id.replace("-", ""),
  18. **kwargs)
  19. class ResultConsumer(Consumer):
  20. no_ack = True
  21. auto_delete = True
  22. def __init__(self, connection, task_id, **kwargs):
  23. routing_key = task_id.replace("-", "")
  24. super(ResultConsumer, self).__init__(connection,
  25. queue=routing_key, routing_key=routing_key, **kwargs)
  26. class AMQPBackend(BaseDictBackend):
  27. """AMQP backend. Publish results by sending messages to the broker
  28. using the task id as routing key.
  29. **NOTE:** Results published using this backend is read-once only.
  30. After the result has been read, the result is deleted. (however, it's
  31. still cached locally by the backend instance).
  32. """
  33. _connection = None
  34. def __init__(self, connection=None, exchange=None, exchange_type=None,
  35. persistent=None, serializer=None, auto_delete=True,
  36. expires=None, **kwargs):
  37. super(AMQPBackend, self).__init__(**kwargs)
  38. conf = self.app.conf
  39. self._connection = connection
  40. self.queue_arguments = {}
  41. self.exchange = exchange or conf.CELERY_RESULT_EXCHANGE
  42. self.exchange_type = exchange_type or conf.CELERY_RESULT_EXCHANGE_TYPE
  43. if persistent is None:
  44. persistent = conf.CELERY_RESULT_PERSISTENT
  45. self.persistent = persistent
  46. self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
  47. self.auto_delete = auto_delete
  48. self.expires = expires
  49. if self.expires is None:
  50. self.expires = conf.CELERY_AMQP_TASK_RESULT_EXPIRES
  51. if isinstance(self.expires, timedelta):
  52. self.expires = timeutils.timedelta_seconds(self.expires)
  53. if self.expires is not None:
  54. self.expires = int(self.expires)
  55. # WARNING: Requires RabbitMQ 2.1.0 or higher.
  56. # x-expires must be a signed-int, or long describing
  57. # the expiry time in milliseconds.
  58. self.queue_arguments["x-expires"] = int(self.expires * 1000.0)
  59. def _create_publisher(self, task_id, connection):
  60. delivery_mode = self.persistent and 2 or 1
  61. # Declares the queue.
  62. self._create_consumer(task_id, connection).close()
  63. return ResultPublisher(connection, task_id,
  64. exchange=self.exchange,
  65. exchange_type=self.exchange_type,
  66. delivery_mode=delivery_mode,
  67. durable=self.persistent,
  68. serializer=self.serializer,
  69. auto_delete=self.auto_delete)
  70. def _create_consumer(self, task_id, connection):
  71. return ResultConsumer(connection, task_id,
  72. exchange=self.exchange,
  73. exchange_type=self.exchange_type,
  74. durable=self.persistent,
  75. auto_delete=self.auto_delete,
  76. queue_arguments=self.queue_arguments)
  77. def store_result(self, task_id, result, status, traceback=None,
  78. max_retries=20, retry_delay=0.2):
  79. """Send task return value and status."""
  80. result = self.encode_result(result, status)
  81. meta = {"task_id": task_id,
  82. "result": result,
  83. "status": status,
  84. "traceback": traceback}
  85. for i in range(max_retries + 1):
  86. try:
  87. publisher = self._create_publisher(task_id, self.connection)
  88. publisher.send(meta)
  89. publisher.close()
  90. except Exception, exc:
  91. if not max_retries:
  92. raise
  93. self._connection = None
  94. warnings.warn(AMQResultWarning(
  95. "Error sending result %s: %r" % (task_id, exc)))
  96. time.sleep(retry_delay)
  97. break
  98. return result
  99. def get_task_meta(self, task_id, cache=True):
  100. return self.poll(task_id)
  101. def wait_for(self, task_id, timeout=None, cache=True):
  102. cached_meta = self._cache.get(task_id)
  103. if cached_meta and cached_meta["status"] in states.READY_STATES:
  104. meta = cached_meta
  105. else:
  106. try:
  107. meta = self.consume(task_id, timeout=timeout)
  108. except socket.timeout:
  109. raise TimeoutError("The operation timed out.")
  110. if meta["status"] == states.SUCCESS:
  111. return meta["result"]
  112. elif meta["status"] in states.PROPAGATE_STATES:
  113. raise self.exception_to_python(meta["result"])
  114. else:
  115. return self.wait_for(task_id, timeout, cache)
  116. def poll(self, task_id):
  117. consumer = self._create_consumer(task_id, self.connection)
  118. result = consumer.fetch()
  119. try:
  120. if result:
  121. payload = self._cache[task_id] = result.payload
  122. return payload
  123. else:
  124. # Use previously received status if any.
  125. if task_id in self._cache:
  126. return self._cache[task_id]
  127. return {"status": states.PENDING, "result": None}
  128. finally:
  129. consumer.close()
  130. def consume(self, task_id, timeout=None):
  131. results = []
  132. def callback(meta, message):
  133. if meta["status"] in states.READY_STATES:
  134. results.append(meta)
  135. wait = self.connection.drain_events
  136. consumer = self._create_consumer(task_id, self.connection)
  137. consumer.register_callback(callback)
  138. consumer.consume()
  139. try:
  140. time_start = time.time()
  141. while True:
  142. # Total time spent may exceed a single call to wait()
  143. if timeout and time.time() - time_start >= timeout:
  144. raise socket.timeout()
  145. wait(timeout=timeout)
  146. if results:
  147. # Got event on the wanted channel.
  148. break
  149. finally:
  150. consumer.close()
  151. self._cache[task_id] = results[0]
  152. return results[0]
  153. def close(self):
  154. if self._connection is not None:
  155. self._connection.close()
  156. @property
  157. def connection(self):
  158. if not self._connection:
  159. self._connection = self.app.broker_connection()
  160. return self._connection
  161. def reload_task_result(self, task_id):
  162. raise NotImplementedError(
  163. "reload_task_result is not supported by this backend.")
  164. def reload_taskset_result(self, task_id):
  165. """Reload taskset result, even if it has been previously fetched."""
  166. raise NotImplementedError(
  167. "reload_taskset_result is not supported by this backend.")
  168. def save_taskset(self, taskset_id, result):
  169. """Store the result and status of a task."""
  170. raise NotImplementedError(
  171. "save_taskset is not supported by this backend.")
  172. def restore_taskset(self, taskset_id, cache=True):
  173. """Get the result of a taskset."""
  174. raise NotImplementedError(
  175. "restore_taskset is not supported by this backend.")