|
@@ -157,19 +157,21 @@ class AMQPBackend(BaseDictBackend):
|
|
with self.app.pool.acquire_channel(block=True) as (_, channel):
|
|
with self.app.pool.acquire_channel(block=True) as (_, channel):
|
|
binding = self._create_binding(task_id)(channel)
|
|
binding = self._create_binding(task_id)(channel)
|
|
binding.declare()
|
|
binding.declare()
|
|
- latest, acc = None, None
|
|
|
|
- for i in xrange(backlog_limit):
|
|
|
|
- latest, acc = acc, binding.get(no_ack=True)
|
|
|
|
|
|
+ prev = latest = acc = None
|
|
|
|
+ for i in xrange(backlog_limit): ## spool ffwd
|
|
|
|
+ prev, latest, acc = latest, acc, binding.get(no_ack=False)
|
|
if not acc: # no more messages
|
|
if not acc: # no more messages
|
|
break
|
|
break
|
|
|
|
+ if prev:
|
|
|
|
+ # backends are not expected to keep history,
|
|
|
|
+ # so we delete everything except the most recent state.
|
|
|
|
+ prev.ack()
|
|
else:
|
|
else:
|
|
raise self.BacklogLimitExceeded(task_id)
|
|
raise self.BacklogLimitExceeded(task_id)
|
|
|
|
|
|
if latest:
|
|
if latest:
|
|
- # new state to report
|
|
|
|
- self._republish(channel, task_id, latest.body,
|
|
|
|
- latest.content_type, latest.content_encoding)
|
|
|
|
payload = self._cache[task_id] = latest.payload
|
|
payload = self._cache[task_id] = latest.payload
|
|
|
|
+ latest.requeue()
|
|
return payload
|
|
return payload
|
|
else:
|
|
else:
|
|
# no new state, use previous
|
|
# no new state, use previous
|