|
@@ -115,8 +115,6 @@ class AMQPBackend(BaseDictBackend):
|
|
return result
|
|
return result
|
|
|
|
|
|
def get_task_meta(self, task_id, cache=True):
|
|
def get_task_meta(self, task_id, cache=True):
|
|
- if cache and task_id in self._cache:
|
|
|
|
- return self._cache[task_id]
|
|
|
|
return self.poll(task_id)
|
|
return self.poll(task_id)
|
|
|
|
|
|
def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
|
|
def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
|
|
@@ -147,9 +145,13 @@ class AMQPBackend(BaseDictBackend):
|
|
try:
|
|
try:
|
|
binding = self._create_binding(task_id)(channel)
|
|
binding = self._create_binding(task_id)(channel)
|
|
binding.declare()
|
|
binding.declare()
|
|
- result = binding.get()
|
|
|
|
- if result:
|
|
|
|
- payload = self._cache[task_id] = result.payload
|
|
|
|
|
|
+ latest, acc = None, None
|
|
|
|
+ while 1: # fetch the last state
|
|
|
|
+ latest, acc = acc, binding.get(no_ack=True)
|
|
|
|
+ if not acc:
|
|
|
|
+ break
|
|
|
|
+ if latest:
|
|
|
|
+ payload = self._cache[task_id] = latest.payload
|
|
return payload
|
|
return payload
|
|
elif task_id in self._cache: # use previously received state.
|
|
elif task_id in self._cache: # use previously received state.
|
|
return self._cache[task_id]
|
|
return self._cache[task_id]
|