|
@@ -5,9 +5,9 @@ RPC-style result backend, using reply-to and one queue per client.
|
|
|
"""
|
|
|
from __future__ import absolute_import, unicode_literals
|
|
|
|
|
|
+import kombu
|
|
|
import time
|
|
|
|
|
|
-from kombu import Consumer, Exchange, Producer, Queue
|
|
|
from kombu.common import maybe_declare
|
|
|
from kombu.utils.compat import register_after_fork
|
|
|
from kombu.utils.objects import cached_property
|
|
@@ -29,7 +29,7 @@ class BacklogLimitExceeded(Exception):
|
|
|
"""Too much state history to fast-forward."""
|
|
|
|
|
|
|
|
|
-class NoCacheQueue(Queue):
|
|
|
+class NoCacheQueue(kombu.Queue):
|
|
|
can_cache_declaration = False
|
|
|
|
|
|
|
|
@@ -38,7 +38,7 @@ def _on_after_fork_cleanup_backend(backend):
|
|
|
|
|
|
|
|
|
class ResultConsumer(BaseResultConsumer):
|
|
|
- Consumer = Consumer
|
|
|
+ Consumer = kombu.Consumer
|
|
|
|
|
|
_connection = None
|
|
|
_consumer = None
|
|
@@ -90,10 +90,10 @@ class ResultConsumer(BaseResultConsumer):
|
|
|
class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
"""Base class for the RPC result backend."""
|
|
|
|
|
|
- Exchange = Exchange
|
|
|
+ Exchange = kombu.Exchange
|
|
|
Queue = NoCacheQueue
|
|
|
- Consumer = Consumer
|
|
|
- Producer = Producer
|
|
|
+ Consumer = kombu.Consumer
|
|
|
+ Producer = kombu.Producer
|
|
|
ResultConsumer = ResultConsumer
|
|
|
|
|
|
BacklogLimitExceeded = BacklogLimitExceeded
|
|
@@ -209,7 +209,7 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
binding = self._create_binding(task_id)(channel)
|
|
|
binding.declare()
|
|
|
|
|
|
- for i in range(limit):
|
|
|
+ for _ in range(limit):
|
|
|
msg = binding.get(accept=accept, no_ack=no_ack)
|
|
|
if not msg:
|
|
|
break
|
|
@@ -268,14 +268,14 @@ class RPCBackend(BaseRPCBackend):
|
|
|
|
|
|
persistent = False
|
|
|
|
|
|
- class Consumer(Consumer):
|
|
|
+ class Consumer(kombu.Consumer):
|
|
|
"""Consumer that requires manual declaration of queues."""
|
|
|
|
|
|
auto_declare = False
|
|
|
|
|
|
def _create_exchange(self, name, type='direct', delivery_mode=2):
|
|
|
# uses direct to queue routing (anon exchange).
|
|
|
- return Exchange(None)
|
|
|
+ return self.Exchange(None)
|
|
|
|
|
|
def _create_binding(self, task_id):
|
|
|
return self.binding
|