|
@@ -36,9 +36,6 @@ BROKER_FORMAT = """\
|
|
|
%(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
|
|
|
"""
|
|
|
|
|
|
-#: Set to :const:`True` when the configured queues has been declared.
|
|
|
-_queues_declared = False
|
|
|
-
|
|
|
#: Set of exchange names that has already been declared.
|
|
|
_exchanges_declared = set()
|
|
|
|
|
@@ -207,6 +204,9 @@ class AMQP(object):
|
|
|
Consumer = messaging.Consumer
|
|
|
ConsumerSet = messaging.ConsumerSet
|
|
|
|
|
|
+ #: Set to :const:`True` when the configured queues has been declared.
|
|
|
+ _queues_declared = False
|
|
|
+
|
|
|
def __init__(self, app):
|
|
|
self.app = app
|
|
|
|
|
@@ -248,10 +248,9 @@ class AMQP(object):
|
|
|
**self.app.merge(defaults, kwargs))
|
|
|
|
|
|
# Make sure all queues are declared.
|
|
|
- global _queues_declared
|
|
|
- if not _queues_declared:
|
|
|
+ if not self._queues_declared:
|
|
|
self.get_task_consumer(publisher.connection).close()
|
|
|
- _queues_declared = True
|
|
|
+ self._queues_declared = True
|
|
|
publisher.declare()
|
|
|
|
|
|
return publisher
|
|
@@ -288,6 +287,7 @@ class AMQP(object):
|
|
|
|
|
|
@cached_property
|
|
|
def queues(self):
|
|
|
+ """Queue name⇒ declaration mapping."""
|
|
|
return self.Queues(self.app.conf.CELERY_QUEUES)
|
|
|
|
|
|
@queues.setter
|