| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pytest
 
- from case import ANY, Mock
 
- from kombu import Exchange, Queue
 
- from kombu.utils.functional import maybe_evaluate
 
- from celery.app import routes
 
- from celery.exceptions import QueueNotFound
 
- from celery.five import items
 
- from celery.utils.imports import qualname
 
- def Router(app, *args, **kwargs):
 
-     return routes.Router(*args, app=app, **kwargs)
 
- def E(app, queues):
 
-     def expand(answer):
 
-         return Router(app, [], queues).expand_destination(answer)
 
-     return expand
 
- def set_queues(app, **queues):
 
-     app.conf.task_queues = queues
 
-     app.amqp.queues = app.amqp.Queues(queues)
 
- class RouteCase:
 
-     def setup(self):
 
-         self.a_queue = {
 
-             'exchange': 'fooexchange',
 
-             'exchange_type': 'fanout',
 
-             'routing_key': 'xuzzy',
 
-         }
 
-         self.b_queue = {
 
-             'exchange': 'barexchange',
 
-             'exchange_type': 'topic',
 
-             'routing_key': 'b.b.#',
 
-         }
 
-         self.d_queue = {
 
-             'exchange': self.app.conf.task_default_exchange,
 
-             'exchange_type': self.app.conf.task_default_exchange_type,
 
-             'routing_key': self.app.conf.task_default_routing_key,
 
-         }
 
-         @self.app.task(shared=False)
 
-         def mytask(*args, **kwargs):
 
-             pass
 
-         self.mytask = mytask
 
-     def assert_routes_to_queue(self, queue, router, name,
 
-                                args=[], kwargs={}, options={}):
 
-         assert router.route(options, name, args, kwargs)['queue'].name == queue
 
-     def assert_routes_to_default_queue(self, router, name, *args, **kwargs):
 
-         self.assert_routes_to_queue(
 
-             self.app.conf.task_default_queue, router, name, *args, **kwargs)
 
- class test_MapRoute(RouteCase):
 
-     def test_route_for_task_expanded_route(self):
 
-         set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
 
-         expand = E(self.app, self.app.amqp.queues)
 
-         route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
 
-         assert expand(route(self.mytask.name))['queue'].name == 'foo'
 
-         assert route('celery.awesome') is None
 
-     def test_route_for_task(self):
 
-         set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
 
-         expand = E(self.app, self.app.amqp.queues)
 
-         route = routes.MapRoute({self.mytask.name: self.b_queue})
 
-         eroute = expand(route(self.mytask.name))
 
-         for key, value in items(self.b_queue):
 
-             assert eroute[key] == value
 
-         assert route('celery.awesome') is None
 
-     def test_route_for_task__glob(self):
 
-         route = routes.MapRoute([
 
-             ('proj.tasks.*', 'routeA'),
 
-             ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
 
-         ])
 
-         assert route('proj.tasks.foo') == {'queue': 'routeA'}
 
-         assert route('demoapp.tasks.bar.moo') == {'exchange': 'routeB'}
 
-         assert route('demoapp.foo.bar.moo') is None
 
-     def test_expand_route_not_found(self):
 
-         expand = E(self.app, self.app.amqp.Queues(
 
-                    self.app.conf.task_queues, False))
 
-         route = routes.MapRoute({'a': {'queue': 'x'}})
 
-         with pytest.raises(QueueNotFound):
 
-             expand(route('a'))
 
- class test_lookup_route(RouteCase):
 
-     def test_init_queues(self):
 
-         router = Router(self.app, queues=None)
 
-         assert router.queues == {}
 
-     def test_lookup_takes_first(self):
 
-         set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
 
-         R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
 
-                             {self.mytask.name: {'queue': 'foo'}}))
 
-         router = Router(self.app, R, self.app.amqp.queues)
 
-         self.assert_routes_to_queue('bar', router, self.mytask.name)
 
-     def test_expands_queue_in_options(self):
 
-         set_queues(self.app)
 
-         R = routes.prepare(())
 
-         router = Router(
 
-             self.app, R, self.app.amqp.queues, create_missing=True,
 
-         )
 
-         # apply_async forwards all arguments, even exchange=None etc,
 
-         # so need to make sure it's merged correctly.
 
-         route = router.route(
 
-             {'queue': 'testq',
 
-              'exchange': None,
 
-              'routing_key': None,
 
-              'immediate': False},
 
-             self.mytask.name,
 
-             args=[1, 2], kwargs={},
 
-         )
 
-         assert route['queue'].name == 'testq'
 
-         assert route['queue'].exchange == Exchange('testq')
 
-         assert route['queue'].routing_key == 'testq'
 
-         assert route['immediate'] is False
 
-     def test_expand_destination_string(self):
 
-         set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
 
-         x = Router(self.app, {}, self.app.amqp.queues)
 
-         dest = x.expand_destination('foo')
 
-         assert dest['queue'].name == 'foo'
 
-     def test_expand_destination__Queue(self):
 
-         queue = Queue('foo')
 
-         x = Router(self.app, {}, self.app.amqp.queues)
 
-         dest = x.expand_destination({'queue': queue})
 
-         assert dest['queue'] is queue
 
-     def test_lookup_paths_traversed(self):
 
-         self.simple_queue_setup()
 
-         R = routes.prepare((
 
-             {'celery.xaza': {'queue': 'bar'}},
 
-             {self.mytask.name: {'queue': 'foo'}}
 
-         ))
 
-         router = Router(self.app, R, self.app.amqp.queues)
 
-         self.assert_routes_to_queue('foo', router, self.mytask.name)
 
-         self.assert_routes_to_default_queue(router, 'celery.poza')
 
-     def test_compat_router_class(self):
 
-         self.simple_queue_setup()
 
-         R = routes.prepare((
 
-             TestRouter(),
 
-         ))
 
-         router = Router(self.app, R, self.app.amqp.queues)
 
-         self.assert_routes_to_queue('bar', router, 'celery.xaza')
 
-         self.assert_routes_to_default_queue(router, 'celery.poza')
 
-     def test_router_fun__called_with(self):
 
-         self.simple_queue_setup()
 
-         step = Mock(spec=['__call__'])
 
-         step.return_value = None
 
-         R = routes.prepare([step])
 
-         router = Router(self.app, R, self.app.amqp.queues)
 
-         self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
 
-         step.assert_called_with(
 
-             self.mytask.name, (2, 2), {'kw': 3}, ANY,
 
-             task=self.mytask,
 
-         )
 
-         options = step.call_args[0][3]
 
-         assert options['priority'] == 3
 
-     def test_compat_router_classes__called_with(self):
 
-         self.simple_queue_setup()
 
-         step = Mock(spec=['route_for_task'])
 
-         step.route_for_task.return_value = None
 
-         R = routes.prepare([step])
 
-         router = Router(self.app, R, self.app.amqp.queues)
 
-         self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
 
-         step.route_for_task.assert_called_with(
 
-             self.mytask.name, (2, 2), {'kw': 3},
 
-         )
 
-     def simple_queue_setup(self):
 
-         set_queues(
 
-             self.app, foo=self.a_queue, bar=self.b_queue,
 
-             **{self.app.conf.task_default_queue: self.d_queue})
 
- class TestRouter(object):
 
-     def route_for_task(self, task, args, kwargs):
 
-         if task == 'celery.xaza':
 
-             return 'bar'
 
- class test_prepare:
 
-     def test_prepare(self):
 
-         o = object()
 
-         R = [
 
-             {'foo': 'bar'},
 
-             qualname(TestRouter),
 
-             o,
 
-         ]
 
-         p = routes.prepare(R)
 
-         assert isinstance(p[0], routes.MapRoute)
 
-         assert isinstance(maybe_evaluate(p[1]), TestRouter)
 
-         assert p[2] is o
 
-         assert routes.prepare(o) == [o]
 
-     def test_prepare_item_is_dict(self):
 
-         R = {'foo': 'bar'}
 
-         p = routes.prepare(R)
 
-         assert isinstance(p[0], routes.MapRoute)
 
 
  |