amqp.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. """celery.backends.amqp"""
  2. import socket
  3. import time
  4. from datetime import timedelta
  5. from carrot.messaging import Consumer, Publisher
  6. from celery import conf
  7. from celery import states
  8. from celery.backends.base import BaseDictBackend
  9. from celery.exceptions import TimeoutError
  10. from celery.messaging import establish_connection
  11. from celery.utils import timeutils
  12. class ResultPublisher(Publisher):
  13. exchange = conf.RESULT_EXCHANGE
  14. exchange_type = conf.RESULT_EXCHANGE_TYPE
  15. delivery_mode = conf.RESULT_PERSISTENT and 2 or 1
  16. serializer = conf.RESULT_SERIALIZER
  17. durable = conf.RESULT_PERSISTENT
  18. auto_delete = True
  19. def __init__(self, connection, task_id, **kwargs):
  20. super(ResultPublisher, self).__init__(connection,
  21. routing_key=task_id.replace("-", ""),
  22. **kwargs)
  23. class ResultConsumer(Consumer):
  24. exchange = conf.RESULT_EXCHANGE
  25. exchange_type = conf.RESULT_EXCHANGE_TYPE
  26. durable = conf.RESULT_PERSISTENT
  27. no_ack = True
  28. auto_delete = True
  29. def __init__(self, connection, task_id, expires=None, **kwargs):
  30. routing_key = task_id.replace("-", "")
  31. if expires is not None:
  32. pass
  33. #self.queue_arguments = {"x-expires": expires}
  34. super(ResultConsumer, self).__init__(connection,
  35. queue=routing_key, routing_key=routing_key, **kwargs)
  36. class AMQPBackend(BaseDictBackend):
  37. """AMQP backend. Publish results by sending messages to the broker
  38. using the task id as routing key.
  39. **NOTE:** Results published using this backend is read-once only.
  40. After the result has been read, the result is deleted. (however, it's
  41. still cached locally by the backend instance).
  42. """
  43. _connection = None
  44. def __init__(self, connection=None, exchange=None, exchange_type=None,
  45. persistent=None, serializer=None, auto_delete=None,
  46. expires=None, **kwargs):
  47. self._connection = connection
  48. self.exchange = exchange
  49. self.exchange_type = exchange_type
  50. self.persistent = persistent
  51. self.serializer = serializer
  52. self.auto_delete = auto_delete
  53. self.expires = expires
  54. if self.expires is None:
  55. self.expires = conf.TASK_RESULT_EXPIRES
  56. if isinstance(self.expires, timedelta):
  57. self.expires = timeutils.timedelta_seconds(self.expires)
  58. if self.expires is not None:
  59. self.expires = int(self.expires)
  60. super(AMQPBackend, self).__init__(**kwargs)
  61. def _create_publisher(self, task_id, connection):
  62. delivery_mode = self.persistent and 2 or 1
  63. # Declares the queue.
  64. self._create_consumer(task_id, connection).close()
  65. return ResultPublisher(connection, task_id,
  66. exchange=self.exchange,
  67. exchange_type=self.exchange_type,
  68. delivery_mode=delivery_mode,
  69. serializer=self.serializer,
  70. auto_delete=self.auto_delete)
  71. def _create_consumer(self, task_id, connection):
  72. return ResultConsumer(connection, task_id,
  73. exchange=self.exchange,
  74. exchange_type=self.exchange_type,
  75. durable=self.persistent,
  76. auto_delete=self.auto_delete,
  77. expires=self.expires)
  78. def store_result(self, task_id, result, status, traceback=None):
  79. """Send task return value and status."""
  80. result = self.encode_result(result, status)
  81. meta = {"task_id": task_id,
  82. "result": result,
  83. "status": status,
  84. "traceback": traceback}
  85. publisher = self._create_publisher(task_id, self.connection)
  86. try:
  87. publisher.send(meta)
  88. finally:
  89. publisher.close()
  90. return result
  91. def get_task_meta(self, task_id, cache=True):
  92. return self.poll(task_id)
  93. def wait_for(self, task_id, timeout=None, cache=True):
  94. time_start = time()
  95. if task_id in self._cache:
  96. cached_meta = self._cache[task_id]
  97. if cached_meta and cached_meta["status"] in states.READY_STATES:
  98. meta = cached_meta
  99. else:
  100. try:
  101. meta = self.consume(task_id, timeout=timeout)
  102. except socket.timeout:
  103. raise TimeoutError("The operation timed out.")
  104. if meta["status"] == states.SUCCESS:
  105. return meta["result"]
  106. elif meta["status"] in states.PROPAGATE_STATES:
  107. raise self.exception_to_python(meta["result"])
  108. else:
  109. return self.wait_for(task_id, timeout, cache)
  110. def poll(self, task_id):
  111. consumer = self._create_consumer(task_id, self.connection)
  112. result = consumer.fetch()
  113. try:
  114. if result:
  115. payload = self._cache[task_id] = result.payload
  116. return payload
  117. else:
  118. # Use previously received status if any.
  119. if task_id in self._cache:
  120. return self._cache[task_id]
  121. return {"status": states.PENDING, "result": None}
  122. finally:
  123. consumer.close()
  124. def consume(self, task_id, timeout=None):
  125. results = []
  126. def callback(message_data, message):
  127. results.append(message_data)
  128. wait = self.connection.drain_events
  129. consumer = self._create_consumer(task_id, self.connection)
  130. consumer.register_callback(callback)
  131. consumer.consume()
  132. try:
  133. time_start = time.time()
  134. while True:
  135. # Total time spent may exceed a single call to wait()
  136. if timeout and time.time() - time_start >= timeout:
  137. raise socket.timeout()
  138. wait(timeout=timeout)
  139. if results:
  140. # Got event on the wanted channel.
  141. break
  142. finally:
  143. consumer.close()
  144. self._cache[task_id] = results[0]
  145. return results[0]
  146. def close(self):
  147. if self._connection is not None:
  148. self._connection.close()
  149. @property
  150. def connection(self):
  151. if not self._connection:
  152. self._connection = establish_connection()
  153. return self._connection
  154. def reload_task_result(self, task_id):
  155. raise NotImplementedError(
  156. "reload_task_result is not supported by this backend.")
  157. def reload_taskset_result(self, task_id):
  158. """Reload taskset result, even if it has been previously fetched."""
  159. raise NotImplementedError(
  160. "reload_taskset_result is not supported by this backend.")
  161. def save_taskset(self, taskset_id, result):
  162. """Store the result and status of a task."""
  163. raise NotImplementedError(
  164. "save_taskset is not supported by this backend.")
  165. def restore_taskset(self, taskset_id, cache=True):
  166. """Get the result of a taskset."""
  167. raise NotImplementedError(
  168. "restore_taskset is not supported by this backend.")