Forráskód Böngészése

Task.routing_key and other routing attributes were not respected

Ask Solem 14 éve
szülő
commit
f100c63419
2 módosított fájl, 6 hozzáadás és 3 törlés
  1. 2 2
      celery/routes.py
  2. 4 1
      celery/tests/test_app/test_routes.py

+ 2 - 2
celery/routes.py

@@ -37,8 +37,8 @@ class Router(object):
             if route:  # expands 'queue' in route.
                 return lpmerge(self.expand_destination(route), options)
         if "queue" not in options:
-            options.update(self.expand_destination(
-                            self.app.conf.CELERY_DEFAULT_QUEUE))
+            options = lpmerge(self.expand_destination(
+                                self.app.conf.CELERY_DEFAULT_QUEUE), options)
         return options
 
     def expand_destination(self, route):

+ 4 - 1
celery/tests/test_app/test_routes.py

@@ -20,11 +20,14 @@ def with_queues(**queues):
         def __inner(*args, **kwargs):
             app = current_app
             prev_queues = app.conf.CELERY_QUEUES
+            prev_Queues = app.amqp.queues
             app.conf.CELERY_QUEUES = queues
+            app.amqp.queues = app.amqp.Queues(queues)
             try:
                 return fun(*args, **kwargs)
             finally:
                 app.conf.CELERY_QUEUES = prev_queues
+                app.amqp.queues = prev_Queues
         return __inner
     return patch_fun
 
@@ -109,7 +112,7 @@ class test_lookup_route(unittest.TestCase):
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
                             {"celery.ping": {"queue": "foo"}}))
-        router = routes.Router(R, current_app.conf.CELERY_QUEUES)
+        router = routes.Router(R, current_app.amqp.queues)
         self.assertDictContainsSubset(a_queue,
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))