amqp.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """celery.backends.amqp"""
  2. from carrot.connection import DjangoBrokerConnection
  3. from carrot.messaging import Consumer, Publisher
  4. from celery.backends.base import BaseBackend
  5. RESULTSTORE_EXCHANGE = "celres"
  6. class Backend(BaseBackend):
  7. """AMQP backend. Publish results by sending messages to the broker
  8. using the task id as routing key.
  9. Note that results published using this backend is read-once only.
  10. After the result has been read, the result is deleted.
  11. """
  12. capabilities = ["ResultStore"]
  13. def __init__(self, *args, **kwargs):
  14. super(Backend, self).__init__(*args, **kwargs)
  15. self._cache = {}
  16. def _publisher_for_task_id(self, task_id, connection):
  17. routing_key = task_id.replace("-", "")
  18. return Publisher(connection, exchange=RESULTSTORE_EXCHANGE,
  19. exchange_type="direct",
  20. routing_key="%s" % routing_key)
  21. def _consumer_for_task_id(self, task_id, connection):
  22. routing_key = task_id.replace("-", "")
  23. return Consumer(connection, queue=task_id,
  24. exchange=RESULTSTORE_EXCHANGE,
  25. exchange_type="direct",
  26. routing_key="%s" % routing_key)
  27. def store_result(self, task_id, result, status):
  28. """Mark task as done (executed)."""
  29. if status == "DONE":
  30. result = self.prepare_result(result)
  31. elif status == "FAILURE":
  32. result = self.prepare_exception(result)
  33. meta = {"task_id": task_id,
  34. "result": result,
  35. "status": status}
  36. connection = DjangoBrokerConnection()
  37. publisher = self._publisher_for_task_id(task_id, connection)
  38. consumer = self._consumer_for_task_id(task_id, connection)
  39. c.fetch()
  40. publisher.send(meta, serializer="pickle", immediate=False)
  41. publisher.close()
  42. connection.close()
  43. return result
  44. def is_done(self, task_id):
  45. """Returns ``True`` if task with ``task_id`` has been executed."""
  46. return self.get_status(task_id) == "DONE"
  47. def get_status(self, task_id):
  48. """Get the status of a task."""
  49. return self._get_task_meta_for(task_id)["status"]
  50. def _get_task_meta_for(self, task_id):
  51. results = []
  52. def callback(message_data, message):
  53. results.append(message_data)
  54. message.ack()
  55. routing_key = task_id.replace("-", "")
  56. connection = DjangoBrokerConnection()
  57. consumer = self._consumer_for_task_id(task_id, connection)
  58. consumer.register_callback(callback)
  59. try:
  60. consumer.iterconsume().next()
  61. finally:
  62. consumer.close()
  63. connection.close()
  64. return results[0]
  65. def get_result(self, task_id):
  66. """Get the result for a task."""
  67. result = self._get_task_meta_for(task_id)
  68. if result["status"] == "FAILURE":
  69. return self.exception_to_python(result["result"])
  70. else:
  71. return result["result"]
  72. def cleanup(self):
  73. """Delete expired metadata."""
  74. pass