|
@@ -6,10 +6,16 @@ from functools import wraps
|
|
|
from celery import routes
|
|
|
from celery import current_app
|
|
|
from celery.exceptions import QueueNotFound
|
|
|
+from celery.task import task
|
|
|
from celery.utils import maybe_promise
|
|
|
from celery.tests.utils import Case
|
|
|
|
|
|
|
|
|
+@task
|
|
|
+def mytask():
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
def E(queues):
|
|
|
def expand(answer):
|
|
|
return routes.Router([], queues).expand_destination(answer)
|
|
@@ -52,17 +58,17 @@ class test_MapRoute(Case):
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_route_for_task_expanded_route(self):
|
|
|
expand = E(current_app.conf.CELERY_QUEUES)
|
|
|
- route = routes.MapRoute({"celery.ping": {"queue": "foo"}})
|
|
|
+ route = routes.MapRoute({mytask.name: {"queue": "foo"}})
|
|
|
self.assertDictContainsSubset(a_queue,
|
|
|
- expand(route.route_for_task("celery.ping")))
|
|
|
+ expand(route.route_for_task(mytask.name)))
|
|
|
self.assertIsNone(route.route_for_task("celery.awesome"))
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_route_for_task(self):
|
|
|
expand = E(current_app.conf.CELERY_QUEUES)
|
|
|
- route = routes.MapRoute({"celery.ping": b_queue})
|
|
|
+ route = routes.MapRoute({mytask.name: b_queue})
|
|
|
self.assertDictContainsSubset(b_queue,
|
|
|
- expand(route.route_for_task("celery.ping")))
|
|
|
+ expand(route.route_for_task(mytask.name)))
|
|
|
self.assertIsNone(route.route_for_task("celery.awesome"))
|
|
|
|
|
|
def test_expand_route_not_found(self):
|
|
@@ -80,11 +86,11 @@ class test_lookup_route(Case):
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_lookup_takes_first(self):
|
|
|
- R = routes.prepare(({"celery.ping": {"queue": "bar"}},
|
|
|
- {"celery.ping": {"queue": "foo"}}))
|
|
|
+ R = routes.prepare(({mytask.name: {"queue": "bar"}},
|
|
|
+ {mytask.name: {"queue": "foo"}}))
|
|
|
router = routes.Router(R, current_app.conf.CELERY_QUEUES)
|
|
|
self.assertDictContainsSubset(b_queue,
|
|
|
- router.route({}, "celery.ping",
|
|
|
+ router.route({}, mytask.name,
|
|
|
args=[1, 2], kwargs={}))
|
|
|
|
|
|
@with_queues()
|
|
@@ -98,7 +104,7 @@ class test_lookup_route(Case):
|
|
|
"exchange": None,
|
|
|
"routing_key": None,
|
|
|
"immediate": False},
|
|
|
- "celery.ping",
|
|
|
+ mytask.name,
|
|
|
args=[1, 2], kwargs={})
|
|
|
self.assertDictContainsSubset({"exchange": "testq",
|
|
|
"routing_key": "testq",
|
|
@@ -116,10 +122,10 @@ class test_lookup_route(Case):
|
|
|
current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
|
|
|
def test_lookup_paths_traversed(self):
|
|
|
R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
|
|
|
- {"celery.ping": {"queue": "foo"}}))
|
|
|
+ {mytask.name: {"queue": "foo"}}))
|
|
|
router = routes.Router(R, current_app.amqp.queues)
|
|
|
self.assertDictContainsSubset(a_queue,
|
|
|
- router.route({}, "celery.ping",
|
|
|
+ router.route({}, mytask.name,
|
|
|
args=[1, 2], kwargs={}))
|
|
|
self.assertEqual(router.route({}, "celery.poza"),
|
|
|
dict(d_queue, queue=current_app.conf.CELERY_DEFAULT_QUEUE))
|