Browse Source

add_task_queue must account for Queues.__missing__. Closes #1079

Ask Solem 12 years ago
parent
commit
0bdbcd49b1
1 changed files with 13 additions and 9 deletions
  1. 13 9
      celery/worker/consumer.py

+ 13 - 9
celery/worker/consumer.py

@@ -820,16 +820,20 @@ class Consumer(object):
     def add_task_queue(self, queue, exchange=None, exchange_type=None,
             routing_key=None, **options):
         cset = self.task_consumer
-        try:
-            q = self.app.amqp.queues[queue]
-        except KeyError:
+        queues = self.app.amqp.queues
+        # Must use in' here, as __missing__ will automatically
+        # create queues when CELERY_CREATE_MISSING_QUEUES is enabled.
+        # (Issue #1079)
+        if queue in queues:
+            q = queues[queue]
+        else:
             exchange = queue if exchange is None else exchange
-            exchange_type = 'direct' if exchange_type is None \
-                                     else exchange_type
-            q = self.app.amqp.queues.select_add(queue,
-                    exchange=exchange,
-                    exchange_type=exchange_type,
-                    routing_key=routing_key, **options)
+            exchange_type = ('direct' if   exchange_type is None
+                                      else exchange_type)
+            q = queues.select_add(queue,
+                                  exchange=exchange,
+                                  exchange_type=exchange_type,
+                                  routing_key=routing_key, **options)
         if not cset.consuming_from(queue):
             cset.add_queue(q)
             cset.consume()