|
@@ -116,8 +116,12 @@ class AMQPBackend(BaseDictBackend):
|
|
|
return self.poll(task_id)
|
|
|
|
|
|
def wait_for(self, task_id, timeout=None, cache=True):
|
|
|
+ time_start = time()
|
|
|
if task_id in self._cache:
|
|
|
- meta = self._cache[task_id]
|
|
|
+ cached_meta = self._cache[task_id]
|
|
|
+
|
|
|
+ if cached_meta and cached_meta["status"] in states.READY_STATES:
|
|
|
+ meta = cached_meta
|
|
|
else:
|
|
|
try:
|
|
|
meta = self.consume(task_id, timeout=timeout)
|
|
@@ -128,6 +132,9 @@ class AMQPBackend(BaseDictBackend):
|
|
|
return meta["result"]
|
|
|
elif meta["status"] in states.PROPAGATE_STATES:
|
|
|
raise self.exception_to_python(meta["result"])
|
|
|
+ else:
|
|
|
+ return self.wait_for(task_id, timeout, cache)
|
|
|
+
|
|
|
|
|
|
def poll(self, task_id):
|
|
|
consumer = self._create_consumer(task_id, self.connection)
|