Browse Source

Merge branch '3.0'

Conflicts:
	celery/worker/consumer.py
	docs/userguide/signals.rst
Ask Solem 12 years ago
parent
commit
f8119b329f
4 changed files with 23 additions and 13 deletions
  1. 10 0
      celery/app/amqp.py
  2. 1 2
      celery/worker/consumer.py
  3. 1 1
      docs/userguide/signals.rst
  4. 11 10
      setup.py

+ 10 - 0
celery/app/amqp.py

@@ -92,6 +92,8 @@ class Queues(dict):
     def add_compat(self, name, **options):
         # docs used to use binding_key as routing key
         options.setdefault('routing_key', options.get('binding_key'))
+        if options['routing_key'] is None:
+            options['routing_key'] = name
         q = self[name] = entry_to_queue(name, **options)
         return q
 
@@ -110,6 +112,14 @@ class Queues(dict):
             return textindent('\n'.join(info), indent)
         return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
 
+    def select_add(self, queue, **kwargs):
+        """Add new task queue that will be consumed from even when
+        a subset has been selected using the :option:`-Q` option."""
+        q = self.add(queue, **kwargs)
+        if self._consume_from is not None:
+            self._consume_from[q.name] = q
+        return q
+
     def select_subset(self, wanted):
         """Sets :attr:`consume_from` by selecting a subset of the
         currently defined queues.

+ 1 - 2
celery/worker/consumer.py

@@ -771,10 +771,9 @@ class Consumer(object):
             q = self.app.amqp.queues[queue]
         except KeyError:
             exchange = queue if exchange is None else exchange
-            routing_key = queue if routing_key is None else routing_key
             exchange_type = 'direct' if exchange_type is None \
                                      else exchange_type
-            q = self.app.amqp.queues.add(queue,
+            q = self.app.amqp.queues.select_add(queue,
                     exchange=exchange,
                     exchange_type=exchange_type,
                     routing_key=routing_key, **options)

+ 1 - 1
docs/userguide/signals.rst

@@ -226,7 +226,7 @@ used to route a task to any specific worker:
     @celeryd_after_setup.connect
     def setup_direct_queue(sender, instance, **kwargs):
         queue_name = '%s.dq' % sender   # sender is the hostname of the worker
-        instance.app.queues.add(queue_name, routing_key=queue_name)
+        instance.app.queues.select_add(queue_name)
 
 Provides arguments:
 

+ 11 - 10
setup.py

@@ -1,5 +1,16 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
+
+try:
+    from setuptools import setup, find_packages
+    from setuptools.command.test import test
+except ImportError:
+    raise
+    from ez_setup import use_setuptools
+    use_setuptools()
+    from setuptools import setup, find_packages           # noqa
+    from setuptools.command.test import test              # noqa
+
 import os
 import sys
 import codecs
@@ -33,16 +44,6 @@ except:
     pass
 
 
-try:
-    from setuptools import setup, find_packages
-    from setuptools.command.test import test
-except ImportError:
-    raise
-    from ez_setup import use_setuptools
-    use_setuptools()
-    from setuptools import setup, find_packages           # noqa
-    from setuptools.command.test import test              # noqa
-
 NAME = 'celery'
 entrypoints = {}
 extra = {}