|
@@ -31,10 +31,10 @@ from celery.app import app_or_default
|
|
|
from celery.canvas import subtask
|
|
|
from celery.five import items, values
|
|
|
from celery.task.trace import build_tracer
|
|
|
-from celery.utils.timer2 import default_timer, to_timestamp
|
|
|
from celery.utils.functional import noop
|
|
|
from celery.utils.log import get_logger
|
|
|
from celery.utils.text import truncate
|
|
|
+from celery.utils.timer2 import default_timer, to_timestamp
|
|
|
from celery.utils.timeutils import humanize_seconds, timezone
|
|
|
|
|
|
from . import heartbeat, loops, pidbox
|
|
@@ -276,16 +276,20 @@ class Consumer(object):
|
|
|
def add_task_queue(self, queue, exchange=None, exchange_type=None,
|
|
|
routing_key=None, **options):
|
|
|
cset = self.task_consumer
|
|
|
- try:
|
|
|
- q = self.app.amqp.queues[queue]
|
|
|
- except KeyError:
|
|
|
+ queues = self.app.amqp.queues
|
|
|
+ # Must use in' here, as __missing__ will automatically
|
|
|
+ # create queues when CELERY_CREATE_MISSING_QUEUES is enabled.
|
|
|
+ # (Issue #1079)
|
|
|
+ if queue in queues:
|
|
|
+ q = queues[queue]
|
|
|
+ else:
|
|
|
exchange = queue if exchange is None else exchange
|
|
|
- exchange_type = 'direct' if exchange_type is None \
|
|
|
- else exchange_type
|
|
|
- q = self.app.amqp.queues.select_add(queue,
|
|
|
- exchange=exchange,
|
|
|
- exchange_type=exchange_type,
|
|
|
- routing_key=routing_key, **options)
|
|
|
+ exchange_type = ('direct' if exchange_type is None
|
|
|
+ else exchange_type)
|
|
|
+ q = queues.select_add(queue,
|
|
|
+ exchange=exchange,
|
|
|
+ exchange_type=exchange_type,
|
|
|
+ routing_key=routing_key, **options)
|
|
|
if not cset.consuming_from(queue):
|
|
|
cset.add_queue(q)
|
|
|
cset.consume()
|