|
@@ -20,7 +20,6 @@ from celery import routes as _routes
|
|
|
from celery import signals
|
|
|
from celery.utils import gen_unique_id, textindent
|
|
|
from celery.utils import promise, maybe_promise
|
|
|
-from celery.utils.compat import UserDict
|
|
|
|
|
|
#: List of known options to a Kombu producers send method.
|
|
|
#: Used to extract the message related options out of any `dict`.
|
|
@@ -46,7 +45,7 @@ def extract_msg_options(options, keep=MSG_OPTIONS):
|
|
|
return dict((name, options.get(name)) for name in keep)
|
|
|
|
|
|
|
|
|
-class Queues(UserDict):
|
|
|
+class Queues(dict):
|
|
|
"""Queue name⇒ declaration mapping.
|
|
|
|
|
|
Celery will consult this mapping to find the options
|
|
@@ -60,7 +59,7 @@ class Queues(UserDict):
|
|
|
_consume_from = None
|
|
|
|
|
|
def __init__(self, queues):
|
|
|
- self.data = {}
|
|
|
+ dict.__init__(self)
|
|
|
for queue_name, options in (queues or {}).items():
|
|
|
self.add(queue_name, **options)
|
|
|
|
|
@@ -90,12 +89,10 @@ class Queues(UserDict):
|
|
|
|
|
|
def format(self, indent=0, indent_first=True):
|
|
|
"""Format routing table into string for log dumps."""
|
|
|
- queues = self
|
|
|
- if self._consume_from is not None:
|
|
|
- queues = self._consume_from
|
|
|
+ active = self.consume_from
|
|
|
info = [QUEUE_FORMAT.strip() % dict(
|
|
|
name=(name + ":").ljust(12), **config)
|
|
|
- for name, config in sorted(queues.items())]
|
|
|
+ for name, config in sorted(active.iteritems())]
|
|
|
if indent_first:
|
|
|
return textindent("\n".join(info), indent)
|
|
|
return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
|
|
@@ -125,6 +122,12 @@ class Queues(UserDict):
|
|
|
self._consume_from = acc
|
|
|
self.update(acc)
|
|
|
|
|
|
+ @property
|
|
|
+ def consume_from(self):
|
|
|
+ if self._consume_from is not None:
|
|
|
+ return self._consume_from
|
|
|
+ return self
|
|
|
+
|
|
|
@classmethod
|
|
|
def with_defaults(cls, queues, default_exchange, default_exchange_type):
|
|
|
"""Alternate constructor that adds default exchange and
|
|
@@ -136,11 +139,6 @@ class Queues(UserDict):
|
|
|
opts.setdefault("routing_key", opts.get("binding_key"))
|
|
|
return cls(queues)
|
|
|
|
|
|
- @property
|
|
|
- def consume_from(self):
|
|
|
- if self._consume_from is not None:
|
|
|
- return self._consume_from
|
|
|
- return self
|
|
|
|
|
|
|
|
|
class TaskPublisher(messaging.Publisher):
|