|
@@ -12,6 +12,10 @@ from celery.task import task
|
|
|
from celery.tests.utils import Case
|
|
|
|
|
|
|
|
|
+def Router(*args, **kwargs):
|
|
|
+ return routes.Router(*args, **dict(kwargs, app=current_app))
|
|
|
+
|
|
|
+
|
|
|
@task()
|
|
|
def mytask():
|
|
|
pass
|
|
@@ -19,7 +23,7 @@ def mytask():
|
|
|
|
|
|
def E(queues):
|
|
|
def expand(answer):
|
|
|
- return routes.Router([], queues).expand_destination(answer)
|
|
|
+ return Router([], queues).expand_destination(answer)
|
|
|
return expand
|
|
|
|
|
|
|
|
@@ -91,22 +95,21 @@ class test_MapRoute(RouteCase):
|
|
|
class test_lookup_route(RouteCase):
|
|
|
|
|
|
def test_init_queues(self):
|
|
|
- router = routes.Router(queues=None)
|
|
|
+ router = Router(queues=None)
|
|
|
self.assertDictEqual(router.queues, {})
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_lookup_takes_first(self):
|
|
|
R = routes.prepare(({mytask.name: {'queue': 'bar'}},
|
|
|
{mytask.name: {'queue': 'foo'}}))
|
|
|
- router = routes.Router(R, current_app.amqp.queues)
|
|
|
+ router = Router(R, current_app.amqp.queues)
|
|
|
self.assertAnswer(router.route({}, mytask.name,
|
|
|
args=[1, 2], kwargs={}), b_queue)
|
|
|
|
|
|
@with_queues()
|
|
|
def test_expands_queue_in_options(self):
|
|
|
R = routes.prepare(())
|
|
|
- router = routes.Router(R, current_app.amqp.queues,
|
|
|
- create_missing=True)
|
|
|
+ router = Router(R, current_app.amqp.queues, create_missing=True)
|
|
|
# apply_async forwards all arguments, even exchange=None etc,
|
|
|
# so need to make sure it's merged correctly.
|
|
|
route = router.route({'queue': 'testq',
|
|
@@ -123,7 +126,7 @@ class test_lookup_route(RouteCase):
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_expand_destination_string(self):
|
|
|
- x = routes.Router({}, current_app.amqp.queues)
|
|
|
+ x = Router({}, current_app.amqp.queues)
|
|
|
dest = x.expand_destination('foo')
|
|
|
self.assertEqual(dest['exchange'].name, 'fooexchange')
|
|
|
|
|
@@ -132,7 +135,7 @@ class test_lookup_route(RouteCase):
|
|
|
def test_lookup_paths_traversed(self):
|
|
|
R = routes.prepare(({'celery.xaza': {'queue': 'bar'}},
|
|
|
{mytask.name: {'queue': 'foo'}}))
|
|
|
- router = routes.Router(R, current_app.amqp.queues)
|
|
|
+ router = Router(R, current_app.amqp.queues)
|
|
|
self.assertAnswer(router.route({}, mytask.name,
|
|
|
args=[1, 2], kwargs={}), a_queue)
|
|
|
self.assertAnswer(router.route({}, 'celery.poza'),
|