|
@@ -14,7 +14,7 @@ from kombu import Exchange, Queue, Producer, Consumer
|
|
|
from kombu.utils import register_after_fork
|
|
|
|
|
|
from celery import states
|
|
|
-from celery.five import range
|
|
|
+from celery.five import items, range
|
|
|
from celery.utils.functional import dictfilter
|
|
|
from celery.utils.log import get_logger
|
|
|
from celery.utils.timeutils import maybe_s_to_ms
|
|
@@ -202,55 +202,65 @@ class AMQPBackend(base.Backend, AsyncBackendMixin):
|
|
|
self._out_of_band[task_id] = message
|
|
|
|
|
|
def get_task_meta(self, task_id, backlog_limit=1000):
|
|
|
- try:
|
|
|
- buffered = self._out_of_band.pop(task_id)
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- else:
|
|
|
- payload = self._cache[task_id] = self.meta_from_decoded(
|
|
|
- buffered.payload)
|
|
|
- return payload
|
|
|
+ buffered = self._out_of_band.pop(task_id, None)
|
|
|
+ if buffered:
|
|
|
+ return self._set_cache_by_message(task_id, buffered)
|
|
|
+
|
|
|
# Polling and using basic_get
|
|
|
+ latest_by_id = {}
|
|
|
+ prev = None
|
|
|
+ for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):
|
|
|
+ tid = self._get_message_task_id(acc)
|
|
|
+ prev, latest_by_id[tid] = latest_by_id.get(tid), acc
|
|
|
+ if prev:
|
|
|
+ # backends are not expected to keep history,
|
|
|
+ # so we delete everything except the most recent state.
|
|
|
+ prev.ack()
|
|
|
+ prev = None
|
|
|
+
|
|
|
+ latest = latest_by_id.pop(task_id, None)
|
|
|
+ for tid, msg in items(latest_by_id):
|
|
|
+ self.on_out_of_band_result(tid, msg)
|
|
|
+
|
|
|
+ if latest:
|
|
|
+ latest.requeue()
|
|
|
+ return self._set_cache_by_message(task_id, latest)
|
|
|
+ else:
|
|
|
+ # no new state, use previous
|
|
|
+ try:
|
|
|
+ return self._cache[task_id]
|
|
|
+ except KeyError:
|
|
|
+ # result probably pending.
|
|
|
+ return {'status': states.PENDING, 'result': None}
|
|
|
+ poll = get_task_meta # XXX compat
|
|
|
+
|
|
|
+ def _set_cache_by_message(self, task_id, message):
|
|
|
+ payload = self._cache[task_id] = self.meta_from_decoded(
|
|
|
+ message.payload)
|
|
|
+ return payload
|
|
|
+
|
|
|
+ def _slurp_from_queue(self, task_id, accept,
|
|
|
+ limit=1000, no_ack=False):
|
|
|
with self.app.pool.acquire_channel(block=True) as (_, channel):
|
|
|
binding = self._create_binding(task_id)(channel)
|
|
|
binding.declare()
|
|
|
|
|
|
- prev = latest = acc = None
|
|
|
- for i in range(backlog_limit): # spool ffwd
|
|
|
- acc = binding.get(
|
|
|
- accept=self.accept, no_ack=False,
|
|
|
- )
|
|
|
- if not acc: # no more messages
|
|
|
+ for i in range(limit):
|
|
|
+ msg = binding.get(accept=accept, no_ack=no_ack)
|
|
|
+ if not msg:
|
|
|
break
|
|
|
- try:
|
|
|
- message_task_id = acc.properties['correlation_id']
|
|
|
- except (AttributeError, KeyError):
|
|
|
- message_task_id = acc.payload['task_id']
|
|
|
- if message_task_id == task_id:
|
|
|
- prev, latest = latest, acc
|
|
|
- if prev:
|
|
|
- # backends are not expected to keep history,
|
|
|
- # so we delete everything except the most recent state.
|
|
|
- prev.ack()
|
|
|
- prev = None
|
|
|
- else:
|
|
|
- self.on_out_of_band_result(message_task_id, acc)
|
|
|
+ yield msg
|
|
|
else:
|
|
|
raise self.BacklogLimitExceeded(task_id)
|
|
|
|
|
|
- if latest:
|
|
|
- payload = self._cache[task_id] = self.meta_from_decoded(
|
|
|
- latest.payload)
|
|
|
- latest.requeue()
|
|
|
- return payload
|
|
|
- else:
|
|
|
- # no new state, use previous
|
|
|
- try:
|
|
|
- return self._cache[task_id]
|
|
|
- except KeyError:
|
|
|
- # result probably pending.
|
|
|
- return {'status': states.PENDING, 'result': None}
|
|
|
- poll = get_task_meta # XXX compat
|
|
|
+ def _get_message_task_id(self, message):
|
|
|
+ try:
|
|
|
+ # try property first so we don't have to deserialize
|
|
|
+ # the payload.
|
|
|
+ return message.properties['correlation_id']
|
|
|
+ except (AttributeError, KeyError):
|
|
|
+ # message sent by old Celery version, need to deserialize.
|
|
|
+ return message.payload['task_id']
|
|
|
|
|
|
def reload_task_result(self, task_id):
|
|
|
raise NotImplementedError(
|