|
@@ -439,18 +439,25 @@ class KeyValueStoreBackend(BaseBackend):
|
|
return bytes_to_str(key[len(prefix):])
|
|
return bytes_to_str(key[len(prefix):])
|
|
return bytes_to_str(key)
|
|
return bytes_to_str(key)
|
|
|
|
|
|
|
|
+ def _filter_ready(self, values, READY_STATES=states.READY_STATES):
|
|
|
|
+ for k, v in values:
|
|
|
|
+ if v is not None:
|
|
|
|
+ v = self.decode_result(v)
|
|
|
|
+ if v['status'] in READY_STATES:
|
|
|
|
+ yield k, v
|
|
|
|
+
|
|
def _mget_to_results(self, values, keys):
|
|
def _mget_to_results(self, values, keys):
|
|
if hasattr(values, 'items'):
|
|
if hasattr(values, 'items'):
|
|
# client returns dict so mapping preserved.
|
|
# client returns dict so mapping preserved.
|
|
return {
|
|
return {
|
|
- self._strip_prefix(k): self.decode_result(v)
|
|
|
|
- for k, v in items(values) if v is not None
|
|
|
|
|
|
+ self._strip_prefix(k): v
|
|
|
|
+ for k, v in self._filter_ready(items(values))
|
|
}
|
|
}
|
|
else:
|
|
else:
|
|
# client returns list so need to recreate mapping.
|
|
# client returns list so need to recreate mapping.
|
|
return {
|
|
return {
|
|
- bytes_to_str(keys[i]): self.decode_result(value)
|
|
|
|
- for i, value in enumerate(values) if value is not None
|
|
|
|
|
|
+ bytes_to_str(keys[i]): v
|
|
|
|
+ for i, v in self._filter_ready(enumerate(values))
|
|
}
|
|
}
|
|
|
|
|
|
def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
|
|
def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
|