| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 | from __future__ import absolute_import, unicode_literalsimport pytestfrom case import ANY, Mockfrom kombu import Exchange, Queuefrom kombu.utils.functional import maybe_evaluatefrom celery.app import routesfrom celery.exceptions import QueueNotFoundfrom celery.five import itemsfrom celery.utils.imports import qualnamedef Router(app, *args, **kwargs):    return routes.Router(*args, app=app, **kwargs)def E(app, queues):    def expand(answer):        return Router(app, [], queues).expand_destination(answer)    return expanddef set_queues(app, **queues):    app.conf.task_queues = queues    app.amqp.queues = app.amqp.Queues(queues)class RouteCase:    def setup(self):        self.a_queue = {            'exchange': 'fooexchange',            'exchange_type': 'fanout',            'routing_key': 'xuzzy',        }        self.b_queue = {            'exchange': 'barexchange',            'exchange_type': 'topic',            'routing_key': 'b.b.#',        }        self.d_queue = {            'exchange': self.app.conf.task_default_exchange,            'exchange_type': self.app.conf.task_default_exchange_type,            'routing_key': self.app.conf.task_default_routing_key,        }        @self.app.task(shared=False)        def mytask(*args, **kwargs):            pass        self.mytask = mytask    def assert_routes_to_queue(self, queue, router, name,                               args=[], kwargs={}, options={}):        assert router.route(options, name, args, kwargs)['queue'].name == queue    def assert_routes_to_default_queue(self, router, name, *args, **kwargs):        self.assert_routes_to_queue(            self.app.conf.task_default_queue, router, name, *args, **kwargs)class test_MapRoute(RouteCase):    def test_route_for_task_expanded_route(self):        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)        expand = E(self.app, self.app.amqp.queues)        route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})        assert expand(route(self.mytask.name))['queue'].name == 'foo'        assert route('celery.awesome') is None    def test_route_for_task(self):        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)        expand = E(self.app, self.app.amqp.queues)        route = routes.MapRoute({self.mytask.name: self.b_queue})        eroute = expand(route(self.mytask.name))        for key, value in items(self.b_queue):            assert eroute[key] == value        assert route('celery.awesome') is None    def test_route_for_task__glob(self):        route = routes.MapRoute([            ('proj.tasks.*', 'routeA'),            ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),        ])        assert route('proj.tasks.foo') == {'queue': 'routeA'}        assert route('demoapp.tasks.bar.moo') == {'exchange': 'routeB'}        assert route('demoapp.foo.bar.moo') is None    def test_expand_route_not_found(self):        expand = E(self.app, self.app.amqp.Queues(                   self.app.conf.task_queues, False))        route = routes.MapRoute({'a': {'queue': 'x'}})        with pytest.raises(QueueNotFound):            expand(route('a'))class test_lookup_route(RouteCase):    def test_init_queues(self):        router = Router(self.app, queues=None)        assert router.queues == {}    def test_lookup_takes_first(self):        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)        R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},                            {self.mytask.name: {'queue': 'foo'}}))        router = Router(self.app, R, self.app.amqp.queues)        self.assert_routes_to_queue('bar', router, self.mytask.name)    def test_expands_queue_in_options(self):        set_queues(self.app)        R = routes.prepare(())        router = Router(            self.app, R, self.app.amqp.queues, create_missing=True,        )        # apply_async forwards all arguments, even exchange=None etc,        # so need to make sure it's merged correctly.        route = router.route(            {'queue': 'testq',             'exchange': None,             'routing_key': None,             'immediate': False},            self.mytask.name,            args=[1, 2], kwargs={},        )        assert route['queue'].name == 'testq'        assert route['queue'].exchange == Exchange('testq')        assert route['queue'].routing_key == 'testq'        assert route['immediate'] is False    def test_expand_destination_string(self):        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)        x = Router(self.app, {}, self.app.amqp.queues)        dest = x.expand_destination('foo')        assert dest['queue'].name == 'foo'    def test_expand_destination__Queue(self):        queue = Queue('foo')        x = Router(self.app, {}, self.app.amqp.queues)        dest = x.expand_destination({'queue': queue})        assert dest['queue'] is queue    def test_lookup_paths_traversed(self):        self.simple_queue_setup()        R = routes.prepare((            {'celery.xaza': {'queue': 'bar'}},            {self.mytask.name: {'queue': 'foo'}}        ))        router = Router(self.app, R, self.app.amqp.queues)        self.assert_routes_to_queue('foo', router, self.mytask.name)        self.assert_routes_to_default_queue(router, 'celery.poza')    def test_compat_router_class(self):        self.simple_queue_setup()        R = routes.prepare((            TestRouter(),        ))        router = Router(self.app, R, self.app.amqp.queues)        self.assert_routes_to_queue('bar', router, 'celery.xaza')        self.assert_routes_to_default_queue(router, 'celery.poza')    def test_router_fun__called_with(self):        self.simple_queue_setup()        step = Mock(spec=['__call__'])        step.return_value = None        R = routes.prepare([step])        router = Router(self.app, R, self.app.amqp.queues)        self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)        step.assert_called_with(            self.mytask.name, (2, 2), {'kw': 3}, ANY,            task=self.mytask,        )        options = step.call_args[0][3]        assert options['priority'] == 3    def test_compat_router_classes__called_with(self):        self.simple_queue_setup()        step = Mock(spec=['route_for_task'])        step.route_for_task.return_value = None        R = routes.prepare([step])        router = Router(self.app, R, self.app.amqp.queues)        self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)        step.route_for_task.assert_called_with(            self.mytask.name, (2, 2), {'kw': 3},        )    def simple_queue_setup(self):        set_queues(            self.app, foo=self.a_queue, bar=self.b_queue,            **{self.app.conf.task_default_queue: self.d_queue})class TestRouter(object):    def route_for_task(self, task, args, kwargs):        if task == 'celery.xaza':            return 'bar'class test_prepare:    def test_prepare(self):        o = object()        R = [            {'foo': 'bar'},            qualname(TestRouter),            o,        ]        p = routes.prepare(R)        assert isinstance(p[0], routes.MapRoute)        assert isinstance(maybe_evaluate(p[1]), TestRouter)        assert p[2] is o        assert routes.prepare(o) == [o]    def test_prepare_item_is_dict(self):        R = {'foo': 'bar'}        p = routes.prepare(R)        assert isinstance(p[0], routes.MapRoute)
 |