from __future__ import absolute_import, unicode_literals import pytest from case import ANY, Mock from kombu import Exchange, Queue from kombu.utils.functional import maybe_evaluate from celery.app import routes from celery.exceptions import QueueNotFound from celery.five import items from celery.utils.imports import qualname def Router(app, *args, **kwargs): return routes.Router(*args, app=app, **kwargs) def E(app, queues): def expand(answer): return Router(app, [], queues).expand_destination(answer) return expand def set_queues(app, **queues): app.conf.task_queues = queues app.amqp.queues = app.amqp.Queues(queues) class RouteCase: def setup(self): self.a_queue = { 'exchange': 'fooexchange', 'exchange_type': 'fanout', 'routing_key': 'xuzzy', } self.b_queue = { 'exchange': 'barexchange', 'exchange_type': 'topic', 'routing_key': 'b.b.#', } self.d_queue = { 'exchange': self.app.conf.task_default_exchange, 'exchange_type': self.app.conf.task_default_exchange_type, 'routing_key': self.app.conf.task_default_routing_key, } @self.app.task(shared=False) def mytask(*args, **kwargs): pass self.mytask = mytask def assert_routes_to_queue(self, queue, router, name, args=[], kwargs={}, options={}): assert router.route(options, name, args, kwargs)['queue'].name == queue def assert_routes_to_default_queue(self, router, name, *args, **kwargs): self.assert_routes_to_queue( self.app.conf.task_default_queue, router, name, *args, **kwargs) class test_MapRoute(RouteCase): def test_route_for_task_expanded_route(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) expand = E(self.app, self.app.amqp.queues) route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}}) assert expand(route(self.mytask.name))['queue'].name == 'foo' assert route('celery.awesome') is None def test_route_for_task(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) expand = E(self.app, self.app.amqp.queues) route = routes.MapRoute({self.mytask.name: self.b_queue}) eroute = expand(route(self.mytask.name)) for key, value in items(self.b_queue): assert eroute[key] == value assert route('celery.awesome') is None def test_route_for_task__glob(self): route = routes.MapRoute([ ('proj.tasks.*', 'routeA'), ('demoapp.tasks.bar.*', {'exchange': 'routeB'}), ]) assert route('proj.tasks.foo') == {'queue': 'routeA'} assert route('demoapp.tasks.bar.moo') == {'exchange': 'routeB'} assert route('demoapp.foo.bar.moo') is None def test_expand_route_not_found(self): expand = E(self.app, self.app.amqp.Queues( self.app.conf.task_queues, False)) route = routes.MapRoute({'a': {'queue': 'x'}}) with pytest.raises(QueueNotFound): expand(route('a')) class test_lookup_route(RouteCase): def test_init_queues(self): router = Router(self.app, queues=None) assert router.queues == {} def test_lookup_takes_first(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) R = routes.prepare(({self.mytask.name: {'queue': 'bar'}}, {self.mytask.name: {'queue': 'foo'}})) router = Router(self.app, R, self.app.amqp.queues) self.assert_routes_to_queue('bar', router, self.mytask.name) def test_expands_queue_in_options(self): set_queues(self.app) R = routes.prepare(()) router = Router( self.app, R, self.app.amqp.queues, create_missing=True, ) # apply_async forwards all arguments, even exchange=None etc, # so need to make sure it's merged correctly. route = router.route( {'queue': 'testq', 'exchange': None, 'routing_key': None, 'immediate': False}, self.mytask.name, args=[1, 2], kwargs={}, ) assert route['queue'].name == 'testq' assert route['queue'].exchange == Exchange('testq') assert route['queue'].routing_key == 'testq' assert route['immediate'] is False def test_expand_destination_string(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) x = Router(self.app, {}, self.app.amqp.queues) dest = x.expand_destination('foo') assert dest['queue'].name == 'foo' def test_expand_destination__Queue(self): queue = Queue('foo') x = Router(self.app, {}, self.app.amqp.queues) dest = x.expand_destination({'queue': queue}) assert dest['queue'] is queue def test_lookup_paths_traversed(self): self.simple_queue_setup() R = routes.prepare(( {'celery.xaza': {'queue': 'bar'}}, {self.mytask.name: {'queue': 'foo'}} )) router = Router(self.app, R, self.app.amqp.queues) self.assert_routes_to_queue('foo', router, self.mytask.name) self.assert_routes_to_default_queue(router, 'celery.poza') def test_compat_router_class(self): self.simple_queue_setup() R = routes.prepare(( TestRouter(), )) router = Router(self.app, R, self.app.amqp.queues) self.assert_routes_to_queue('bar', router, 'celery.xaza') self.assert_routes_to_default_queue(router, 'celery.poza') def test_router_fun__called_with(self): self.simple_queue_setup() step = Mock(spec=['__call__']) step.return_value = None R = routes.prepare([step]) router = Router(self.app, R, self.app.amqp.queues) self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3) step.assert_called_with( self.mytask.name, (2, 2), {'kw': 3}, ANY, task=self.mytask, ) options = step.call_args[0][3] assert options['priority'] == 3 def test_compat_router_classes__called_with(self): self.simple_queue_setup() step = Mock(spec=['route_for_task']) step.route_for_task.return_value = None R = routes.prepare([step]) router = Router(self.app, R, self.app.amqp.queues) self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3) step.route_for_task.assert_called_with( self.mytask.name, (2, 2), {'kw': 3}, ) def simple_queue_setup(self): set_queues( self.app, foo=self.a_queue, bar=self.b_queue, **{self.app.conf.task_default_queue: self.d_queue}) class TestRouter(object): def route_for_task(self, task, args, kwargs): if task == 'celery.xaza': return 'bar' class test_prepare: def test_prepare(self): o = object() R = [ {'foo': 'bar'}, qualname(TestRouter), o, ] p = routes.prepare(R) assert isinstance(p[0], routes.MapRoute) assert isinstance(maybe_evaluate(p[1]), TestRouter) assert p[2] is o assert routes.prepare(o) == [o] def test_prepare_item_is_dict(self): R = {'foo': 'bar'} p = routes.prepare(R) assert isinstance(p[0], routes.MapRoute)