Browse Source

Add consumer now uses amqp.queues.select_add to make sure the new queue is added to _consume_from

Ask Solem 12 years ago
parent
commit
d7ea7e0a14
3 changed files with 12 additions and 3 deletions
  1. 10 0
      celery/app/amqp.py
  2. 1 2
      celery/worker/consumer.py
  3. 1 1
      docs/userguide/signals.rst

+ 10 - 0
celery/app/amqp.py

@@ -92,6 +92,8 @@ class Queues(dict):
     def add_compat(self, name, **options):
         # docs used to use binding_key as routing key
         options.setdefault('routing_key', options.get('binding_key'))
+        if options['routing_key'] is None:
+            options['routing_key'] = name
         q = self[name] = entry_to_queue(name, **options)
         return q
 
@@ -110,6 +112,14 @@ class Queues(dict):
             return textindent('\n'.join(info), indent)
         return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
 
+    def select_add(self, queue, **kwargs):
+        """Add new task queue that will be consumed from even when
+        a subset has been selected using the :option:`-Q` option."""
+        q = self.add(queue, **kwargs)
+        if self._consume_from is not None:
+            self._consume_from[q.name] = q
+        return q
+
     def select_subset(self, wanted):
         """Sets :attr:`consume_from` by selecting a subset of the
         currently defined queues.

+ 1 - 2
celery/worker/consumer.py

@@ -771,10 +771,9 @@ class Consumer(object):
             q = self.app.amqp.queues[queue]
         except KeyError:
             exchange = queue if exchange is None else exchange
-            routing_key = queue if routing_key is None else routing_key
             exchange_type = 'direct' if exchange_type is None \
                                      else exchange_type
-            q = self.app.amqp.queues.add(queue,
+            q = self.app.amqp.queues.select_add(queue,
                     exchange=exchange,
                     exchange_type=exchange_type,
                     routing_key=routing_key, **options)

+ 1 - 1
docs/userguide/signals.rst

@@ -226,7 +226,7 @@ used to route a task to any specific worker:
     @celeryd_after_setup.connect
     def setup_direct_queue(sender, instance, **kwargs):
         queue_name = '%s.dq' % sender   # sender is the hostname of the worker
-        instance.app.queues.add(queue_name, routing_key=queue_name)
+        instance.app.queues.select_add(queue_name)
 
 Provides arguments: