|
@@ -125,6 +125,8 @@ class AsyncBackendMixin(object):
|
|
|
if not results:
|
|
|
raise StopIteration()
|
|
|
|
|
|
+ # we tell the result consumer to put consumed results
|
|
|
+ # into these buckets.
|
|
|
bucket = deque()
|
|
|
for node in results:
|
|
|
if node._cache:
|
|
@@ -281,7 +283,11 @@ class BaseResultConsumer(object):
|
|
|
result._maybe_set_cache(meta)
|
|
|
buckets = self.buckets
|
|
|
try:
|
|
|
- buckets.pop(result)
|
|
|
+ # remove bucket for this result, since it's fulfilled
|
|
|
+ bucket = buckets.pop(result)
|
|
|
except KeyError:
|
|
|
pass
|
|
|
+ else:
|
|
|
+ # send to waiter via bucket
|
|
|
+ bucket.append(result)
|
|
|
sleep(0)
|