|
@@ -767,14 +767,18 @@ class Consumer(object):
|
|
|
def add_task_queue(self, queue, exchange=None, exchange_type=None,
|
|
|
routing_key=None, **options):
|
|
|
cset = self.task_consumer
|
|
|
- exchange = queue if exchange is None else exchange
|
|
|
- routing_key = queue if routing_key is None else routing_key
|
|
|
- exchange_type = 'direct' if exchange_type is None else exchange_type
|
|
|
- if not cset.consuming_from(queue):
|
|
|
+ try:
|
|
|
+ q = self.app.amqp.queues[queue]
|
|
|
+ except KeyError:
|
|
|
+ exchange = queue if exchange is None else exchange
|
|
|
+ routing_key = queue if routing_key is None else routing_key
|
|
|
+ exchange_type = 'direct' if exchange_type is None \
|
|
|
+ else exchange_type
|
|
|
q = self.app.amqp.queues.add(queue,
|
|
|
exchange=exchange,
|
|
|
exchange_type=exchange_type,
|
|
|
routing_key=routing_key, **options)
|
|
|
+ if not cset.consuming_from(queue):
|
|
|
cset.add_queue(q)
|
|
|
cset.consume()
|
|
|
logger.info('Started consuming from %r', queue)
|