|
@@ -46,15 +46,17 @@ class AMQPBackend(BaseDictBackend):
|
|
|
|
|
|
"""
|
|
|
|
|
|
- exchange = conf.RESULT_EXCHANGE
|
|
|
- exchange_type = conf.RESULT_EXCHANGE_TYPE
|
|
|
- persistent = conf.RESULT_PERSISTENT
|
|
|
- serializer = conf.RESULT_SERIALIZER
|
|
|
_connection = None
|
|
|
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- self._connection = kwargs.get("connection", None)
|
|
|
- super(AMQPBackend, self).__init__(*args, **kwargs)
|
|
|
+ def __init__(self, connection=None, exchange=None, exchange_type=None,
|
|
|
+ persistent=None, serializer=None, auto_delete=None, **kwargs):
|
|
|
+ self._connection = connection
|
|
|
+ self.exchange = exchange
|
|
|
+ self.exchange_type = exchange_type
|
|
|
+ self.persistent = persistent
|
|
|
+ self.serializer = serializer
|
|
|
+ self.auto_delete = auto_delete
|
|
|
+ super(AMQPBackend, self).__init__(**kwargs)
|
|
|
|
|
|
def _create_publisher(self, task_id, connection):
|
|
|
delivery_mode = self.persistent and 2 or 1
|
|
@@ -72,7 +74,8 @@ class AMQPBackend(BaseDictBackend):
|
|
|
return ResultConsumer(connection, task_id,
|
|
|
exchange=self.exchange,
|
|
|
exchange_type=self.exchange_type,
|
|
|
- durable=self.persistent)
|
|
|
+ durable=self.persistent,
|
|
|
+ auto_delete=self.auto_delete)
|
|
|
|
|
|
def store_result(self, task_id, result, status, traceback=None):
|
|
|
"""Send task return value and status."""
|
|
@@ -91,6 +94,9 @@ class AMQPBackend(BaseDictBackend):
|
|
|
|
|
|
return result
|
|
|
|
|
|
+ def get_task_meta(self, task_id, cache=True):
|
|
|
+ return self.poll(task_id)
|
|
|
+
|
|
|
def wait_for(self, task_id, timeout=None, cache=True):
|
|
|
if task_id in self._cache:
|
|
|
meta = self._cache[task_id]
|
|
@@ -106,22 +112,21 @@ class AMQPBackend(BaseDictBackend):
|
|
|
raise self.exception_to_python(meta["result"])
|
|
|
|
|
|
def poll(self, task_id):
|
|
|
- routing_key = task_id.replace("-", "")
|
|
|
consumer = self._create_consumer(task_id, self.connection)
|
|
|
result = consumer.fetch()
|
|
|
- payload = None
|
|
|
- if result:
|
|
|
- payload = self._cache[task_id] = result.payload
|
|
|
- consumer.backend.queue_delete(routing_key)
|
|
|
- else:
|
|
|
- # Use previously received status if any.
|
|
|
- if task_id in self._cache:
|
|
|
- payload = self._cache[task_id]
|
|
|
+ try:
|
|
|
+ if result:
|
|
|
+ payload = self._cache[task_id] = result.payload
|
|
|
+ return payload
|
|
|
else:
|
|
|
- payload = {"status": states.PENDING, "result": None}
|
|
|
|
|
|
- consumer.close()
|
|
|
- return payload
|
|
|
+ # Use previously received status if any.
|
|
|
+ if task_id in self._cache:
|
|
|
+ return self._cache[task_id]
|
|
|
+
|
|
|
+ return {"status": states.PENDING, "result": None}
|
|
|
+ finally:
|
|
|
+ consumer.close()
|
|
|
|
|
|
def consume(self, task_id, timeout=None):
|
|
|
results = []
|
|
@@ -129,8 +134,6 @@ class AMQPBackend(BaseDictBackend):
|
|
|
def callback(message_data, message):
|
|
|
results.append(message_data)
|
|
|
|
|
|
- routing_key = task_id.replace("-", "")
|
|
|
-
|
|
|
wait = self.connection.connection.wait_multi
|
|
|
consumer = self._create_consumer(task_id, self.connection)
|
|
|
consumer.register_callback(callback)
|
|
@@ -139,14 +142,20 @@ class AMQPBackend(BaseDictBackend):
|
|
|
try:
|
|
|
wait([consumer.backend.channel], timeout=timeout)
|
|
|
finally:
|
|
|
- consumer.backend.queue_delete(routing_key)
|
|
|
consumer.close()
|
|
|
|
|
|
self._cache[task_id] = results[0]
|
|
|
return results[0]
|
|
|
|
|
|
- def get_task_meta(self, task_id, cache=True):
|
|
|
- return self.poll(task_id)
|
|
|
+ def close(self):
|
|
|
+ if self._connection is not None:
|
|
|
+ self._connection.close()
|
|
|
+
|
|
|
+ @property
|
|
|
+ def connection(self):
|
|
|
+ if not self._connection:
|
|
|
+ self._connection = establish_connection()
|
|
|
+ return self._connection
|
|
|
|
|
|
def reload_task_result(self, task_id):
|
|
|
raise NotImplementedError(
|
|
@@ -166,13 +175,3 @@ class AMQPBackend(BaseDictBackend):
|
|
|
"""Get the result of a taskset."""
|
|
|
raise NotImplementedError(
|
|
|
"restore_taskset is not supported by this backend.")
|
|
|
-
|
|
|
- def close(self):
|
|
|
- if self._connection is not None:
|
|
|
- self._connection.close()
|
|
|
-
|
|
|
- @property
|
|
|
- def connection(self):
|
|
|
- if not self._connection:
|
|
|
- self._connection = establish_connection()
|
|
|
- return self._connection
|