|
@@ -11,7 +11,7 @@ import socket
|
|
|
|
|
|
from collections import deque
|
|
|
from time import sleep
|
|
|
-from weakref import WeakKeyDictionary
|
|
|
+from weakref import WeakKeyDictionary, ref
|
|
|
|
|
|
from kombu.syn import detect_environment
|
|
|
from kombu.utils import cached_property
|
|
@@ -108,8 +108,7 @@ class AsyncBackendMixin(object):
|
|
|
def _collect_into(self, result, bucket):
|
|
|
self.result_consumer.buckets[result] = bucket
|
|
|
|
|
|
- def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
|
|
|
- on_message=None, on_interval=None):
|
|
|
+ def iter_native(self, result, no_ack=True, **kwargs):
|
|
|
results = result.results
|
|
|
if not results:
|
|
|
raise StopIteration()
|
|
@@ -121,10 +120,7 @@ class AsyncBackendMixin(object):
|
|
|
else:
|
|
|
self._collect_into(node, bucket)
|
|
|
|
|
|
- for _ in self._wait_for_pending(
|
|
|
- result,
|
|
|
- timeout=timeout, interval=interval, no_ack=no_ack,
|
|
|
- on_message=on_message, on_interval=on_interval):
|
|
|
+ for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
|
|
|
while bucket:
|
|
|
node = bucket.popleft()
|
|
|
yield node.id, node._cache
|
|
@@ -192,7 +188,7 @@ class BaseResultConsumer(object):
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
def _after_fork(self):
|
|
|
- self.bucket.clear()
|
|
|
+ self.buckets.clear()
|
|
|
self.buckets = WeakKeyDictionary()
|
|
|
self.on_message = None
|
|
|
self.on_after_fork()
|
|
@@ -237,7 +233,6 @@ class BaseResultConsumer(object):
|
|
|
result._maybe_set_cache(meta)
|
|
|
buckets = self.buckets
|
|
|
try:
|
|
|
- buckets[result].append(result)
|
|
|
buckets.pop(result)
|
|
|
except KeyError:
|
|
|
pass
|