Browse Source

Make queue argument to apply_async, and Task.queue work properly

Ask Solem 14 years ago
parent
commit
e3e0ef20ee
2 changed files with 27 additions and 2 deletions
  1. 8 2
      celery/routes.py
  2. 19 0
      celery/tests/test_routes.py

+ 8 - 2
celery/routes.py

@@ -4,6 +4,12 @@ from celery.utils import instantiate, firstmethod, mpromise
 _first_route = firstmethod("route_for_task")
 
 
+def merge(a, b):
+    """Like ``dict(a, **b)`` except it will keep values from ``a``,
+    if the value in ``b`` is :const:`None`."""
+    return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
+
+
 class MapRoute(object):
     """Makes a router out of a :class:`dict`."""
 
@@ -37,7 +43,7 @@ class Router(object):
             route = self.lookup_route(task, args, kwargs)
             if route:
                 # Also expand "queue" keys in route.
-                return dict(options, **self.expand_destination(route))
+                return merge(options, self.expand_destination(route))
         return options
 
     def expand_destination(self, route):
@@ -60,7 +66,7 @@ class Router(object):
                     raise QueueNotFound(
                         "Queue '%s' is not defined in CELERY_QUEUES" % queue)
             dest.setdefault("routing_key", dest.get("binding_key"))
-            return dict(dest, **route)
+            return merge(dest, route)
 
         return route
 

+ 19 - 0
celery/tests/test_routes.py

@@ -76,6 +76,25 @@ class test_lookup_route(unittest.TestCase):
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))
 
+    @with_queues()
+    def test_expands_queue_in_options(self):
+        R = routes.prepare(())
+        router = routes.Router(R, conf.QUEUES, create_missing=True)
+        # apply_async forwards all arguments, even exchange=None etc,
+        # so need to make sure it's merged correctly.
+        route = router.route({"queue": "testq",
+                              "exchange": None,
+                              "routing_key": None,
+                              "immediate": False},
+                             "celery.ping",
+                             args=[1, 2], kwargs={})
+        self.assertDictContainsSubset({"exchange": "testq",
+                                       "routing_key": "testq",
+                                       "immediate": False},
+                                       route)
+        self.assertNotIn("queue", route)
+
+
     @with_queues(foo=a_queue, bar=b_queue)
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},