|
@@ -461,6 +461,7 @@ class KeyValueStoreBackend(BaseBackend):
|
|
|
}
|
|
|
|
|
|
def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
|
|
|
+ on_message=None,
|
|
|
READY_STATES=states.READY_STATES):
|
|
|
interval = 0.5 if interval is None else interval
|
|
|
ids = task_ids if isinstance(task_ids, set) else set(task_ids)
|
|
@@ -485,6 +486,8 @@ class KeyValueStoreBackend(BaseBackend):
|
|
|
cache.update(r)
|
|
|
ids.difference_update({bytes_to_str(v) for v in r})
|
|
|
for key, value in items(r):
|
|
|
+ if on_message is not None:
|
|
|
+ on_message(value)
|
|
|
yield bytes_to_str(key), value
|
|
|
if timeout and iterations * interval >= timeout:
|
|
|
raise TimeoutError('Operation timed out ({0})'.format(timeout))
|