|
@@ -6,6 +6,7 @@ from celery.messaging import establish_connection
|
|
|
from celery.backends.base import BaseBackend
|
|
|
|
|
|
|
|
|
+
|
|
|
class AMQPBackend(BaseBackend):
|
|
|
"""AMQP backend. Publish results by sending messages to the broker
|
|
|
using the task id as routing key.
|
|
@@ -18,12 +19,18 @@ class AMQPBackend(BaseBackend):
|
|
|
|
|
|
exchange = conf.RESULT_EXCHANGE
|
|
|
capabilities = ["ResultStore"]
|
|
|
+ _connection = None
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super(AMQPBackend, self).__init__(*args, **kwargs)
|
|
|
- self.connection = establish_connection()
|
|
|
self._cache = {}
|
|
|
|
|
|
+ @property
|
|
|
+ def connection(self):
|
|
|
+ if not self._connection:
|
|
|
+ self._connection = establish_connection()
|
|
|
+ return self._connection
|
|
|
+
|
|
|
def _declare_queue(self, task_id, connection):
|
|
|
routing_key = task_id.replace("-", "")
|
|
|
backend = connection.create_backend()
|