|
@@ -29,7 +29,7 @@ BROKER_FORMAT = """\
|
|
|
%(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
|
|
|
"""
|
|
|
|
|
|
-#: Set to :cosnt:`True` when the configured queues has been declared.
|
|
|
+#: Set to :const:`True` when the configured queues has been declared.
|
|
|
_queues_declared = False
|
|
|
|
|
|
#: Set of exchange names that has already been declared.
|
|
@@ -37,6 +37,8 @@ _exchanges_declared = set()
|
|
|
|
|
|
|
|
|
def extract_msg_options(options, keep=MSG_OPTIONS):
|
|
|
+ """Extracts known options to `basic_publish` from a dict,
|
|
|
+ as a new dict."""
|
|
|
return dict((name, options.get(name)) for name in keep)
|
|
|
|
|
|
|
|
@@ -170,7 +172,7 @@ class AMQP(object):
|
|
|
def __init__(self, app):
|
|
|
self.app = app
|
|
|
|
|
|
- def ConsumerSet(self, *args, **kwargs):
|
|
|
+ def Consumer(self, *args, **kwargs):
|
|
|
return messaging.ConsumerSet(*args, **kwargs)
|
|
|
|
|
|
def Queues(self, queues):
|
|
@@ -211,8 +213,8 @@ class AMQP(object):
|
|
|
return publisher
|
|
|
|
|
|
def get_task_consumer(self, connection, queues=None, **kwargs):
|
|
|
- return self.ConsumerSet(connection, from_dict=queues or self.queues,
|
|
|
- **kwargs)
|
|
|
+ return self.Consumer(connection, from_dict=queues or self.queues,
|
|
|
+ **kwargs)
|
|
|
|
|
|
def get_default_queue(self):
|
|
|
q = self.app.conf.CELERY_DEFAULT_QUEUE
|