ソースを参照

App: Queues: Make sure exchange/routing_key is set by default

Ask Solem 8 年 前
コミット
a9608c49f8
2 ファイル変更21 行追加17 行削除
  1. 20 16
      celery/app/amqp.py
  2. 1 1
      t/unit/app/test_amqp.py

+ 20 - 16
celery/app/amqp.py

@@ -68,15 +68,16 @@ class Queues(dict):
 
     def __init__(self, queues=None, default_exchange=None,
                  create_missing=True, ha_policy=None, autoexchange=None,
-                 max_priority=None):
+                 max_priority=None, default_routing_key=None):
         dict.__init__(self)
         self.aliases = WeakValueDictionary()
         self.default_exchange = default_exchange
+        self.default_routing_key = default_routing_key
         self.create_missing = create_missing
         self.ha_policy = ha_policy
         self.autoexchange = Exchange if autoexchange is None else autoexchange
         self.max_priority = max_priority
-        if isinstance(queues, (tuple, list)):
+        if queues is not None and not isinstance(queues, Mapping):
             queues = {q.name: q for q in queues}
         for name, q in items(queues or {}):
             self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
@@ -118,6 +119,20 @@ class Queues(dict):
         """
         if not isinstance(queue, Queue):
             return self.add_compat(queue, **kwargs)
+        return self._add(queue)
+
+    def add_compat(self, name, **options):
+        # docs used to use binding_key as routing key
+        options.setdefault('routing_key', options.get('binding_key'))
+        if options['routing_key'] is None:
+            options['routing_key'] = name
+        return self._add(Queue.from_dict(name, **options))
+
+    def _add(self, queue):
+        if not queue.routing_key:
+            if queue.exchange is None or queue.exchange.name == '':
+                queue.exchange = self.default_exchange
+            queue.routing_key = self.default_routing_key
         if self.ha_policy:
             if queue.queue_arguments is None:
                 queue.queue_arguments = {}
@@ -129,18 +144,6 @@ class Queues(dict):
         self[queue.name] = queue
         return queue
 
-    def add_compat(self, name, **options):
-        # docs used to use binding_key as routing key
-        options.setdefault('routing_key', options.get('binding_key'))
-        if options['routing_key'] is None:
-            options['routing_key'] = name
-        if self.ha_policy is not None:
-            self._set_ha_policy(options.setdefault('queue_arguments', {}))
-        if self.max_priority is not None:
-            self._set_max_priority(options.setdefault('queue_arguments', {}))
-        q = self[name] = Queue.from_dict(name, **options)
-        return q
-
     def _set_ha_policy(self, args):
         policy = self.ha_policy
         if isinstance(policy, (list, tuple)):
@@ -263,6 +266,7 @@ class AMQP(object):
         # Create new :class:`Queues` instance, using queue defaults
         # from the current configuration.
         conf = self.app.conf
+        default_routing_key = conf.task_default_routing_key
         if create_missing is None:
             create_missing = conf.task_create_missing_queues
         if ha_policy is None:
@@ -272,12 +276,12 @@ class AMQP(object):
         if not queues and conf.task_default_queue:
             queues = (Queue(conf.task_default_queue,
                             exchange=self.default_exchange,
-                            routing_key=conf.task_default_routing_key),)
+                            routing_key=default_routing_key),)
         autoexchange = (self.autoexchange if autoexchange is None
                         else autoexchange)
         return self.queues_cls(
             queues, self.default_exchange, create_missing,
-            ha_policy, autoexchange, max_priority,
+            ha_policy, autoexchange, max_priority, default_routing_key,
         )
 
     def Router(self, queues=None, create_missing=None):

+ 1 - 1
t/unit/app/test_amqp.py

@@ -127,7 +127,7 @@ class test_Queues:
         ex = Exchange('fff', 'fanout')
         q = Queues(default_exchange=ex)
         q.add(Queue('foo'))
-        assert q['foo'].exchange.name == ''
+        assert q['foo'].exchange.name == 'fff'
 
     def test_alias(self):
         q = Queues()