amqp.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. """celery.backends.amqp"""
  2. from carrot.messaging import Consumer, Publisher
  3. from celery import conf
  4. from celery import states
  5. from celery.backends.base import BaseDictBackend
  6. from celery.messaging import establish_connection
  7. from celery.datastructures import LocalCache
  8. class AMQPBackend(BaseDictBackend):
  9. """AMQP backend. Publish results by sending messages to the broker
  10. using the task id as routing key.
  11. **NOTE:** Results published using this backend is read-once only.
  12. After the result has been read, the result is deleted. (however, it's
  13. still cached locally by the backend instance).
  14. """
  15. exchange = conf.RESULT_EXCHANGE
  16. capabilities = ["ResultStore"]
  17. _connection = None
  18. _use_debug_tracking = False
  19. _seen = set()
  20. def __init__(self, *args, **kwargs):
  21. super(AMQPBackend, self).__init__(*args, **kwargs)
  22. @property
  23. def connection(self):
  24. if not self._connection:
  25. self._connection = establish_connection()
  26. return self._connection
  27. def _declare_queue(self, task_id, connection):
  28. routing_key = task_id.replace("-", "")
  29. backend = connection.create_backend()
  30. backend.queue_declare(queue=routing_key, durable=True,
  31. exclusive=False, auto_delete=True)
  32. backend.exchange_declare(exchange=self.exchange,
  33. type="direct",
  34. durable=True,
  35. auto_delete=False)
  36. backend.queue_bind(queue=routing_key, exchange=self.exchange,
  37. routing_key=routing_key)
  38. backend.close()
  39. def _publisher_for_task_id(self, task_id, connection):
  40. routing_key = task_id.replace("-", "")
  41. self._declare_queue(task_id, connection)
  42. p = Publisher(connection, exchange=self.exchange,
  43. exchange_type="direct",
  44. routing_key=routing_key)
  45. return p
  46. def _consumer_for_task_id(self, task_id, connection):
  47. routing_key = task_id.replace("-", "")
  48. self._declare_queue(task_id, connection)
  49. return Consumer(connection, queue=routing_key,
  50. exchange=self.exchange,
  51. exchange_type="direct",
  52. no_ack=False, auto_ack=False,
  53. auto_delete=True,
  54. routing_key=routing_key)
  55. def store_result(self, task_id, result, status, traceback=None):
  56. """Send task return value and status."""
  57. result = self.encode_result(result, status)
  58. meta = {"task_id": task_id,
  59. "result": result,
  60. "status": status,
  61. "traceback": traceback}
  62. connection = self.connection
  63. publisher = self._publisher_for_task_id(task_id, connection)
  64. publisher.send(meta, serializer="pickle")
  65. publisher.close()
  66. return result
  67. def _get_task_meta_for(self, task_id):
  68. assert task_id not in self._seen
  69. self._use_debug_tracking and self._seen.add(task_id)
  70. results = []
  71. def callback(message_data, message):
  72. results.append(message_data)
  73. message.ack()
  74. routing_key = task_id.replace("-", "")
  75. connection = self.connection
  76. consumer = self._consumer_for_task_id(task_id, connection)
  77. consumer.register_callback(callback)
  78. did_exc = None
  79. try:
  80. consumer.iterconsume().next()
  81. except Exception, e:
  82. did_exc = e
  83. consumer.backend.channel.queue_delete(routing_key)
  84. consumer.close()
  85. if did_exc:
  86. raise did_exc
  87. self._cache[task_id] = results[0]
  88. return results[0]
  89. def reload_task_result(self, task_id):
  90. raise NotImplementedError(
  91. "reload_task_result is not supported by this backend.")
  92. def reload_taskset_result(self, task_id):
  93. """Reload taskset result, even if it has been previously fetched."""
  94. raise NotImplementedError(
  95. "reload_taskset_result is not supported by this backend.")
  96. def save_taskset(self, taskset_id, result):
  97. """Store the result and status of a task."""
  98. raise NotImplementedError(
  99. "save_taskset is not supported by this backend.")
  100. def restore_taskset(self, taskset_id, cache=True):
  101. """Get the result of a taskset."""
  102. raise NotImplementedError(
  103. "restore_taskset is not supported by this backend.")