test_routes.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import ANY, Mock
  4. from celery.app import routes
  5. from celery.exceptions import QueueNotFound
  6. from celery.five import items
  7. from celery.utils.imports import qualname
  8. from kombu import Exchange, Queue
  9. from kombu.utils.functional import maybe_evaluate
  10. def Router(app, *args, **kwargs):
  11. return routes.Router(*args, app=app, **kwargs)
  12. def E(app, queues):
  13. def expand(answer):
  14. return Router(app, [], queues).expand_destination(answer)
  15. return expand
  16. def set_queues(app, **queues):
  17. app.conf.task_queues = queues
  18. app.amqp.queues = app.amqp.Queues(queues)
  19. class RouteCase:
  20. def setup(self):
  21. self.a_queue = {
  22. 'exchange': 'fooexchange',
  23. 'exchange_type': 'fanout',
  24. 'routing_key': 'xuzzy',
  25. }
  26. self.b_queue = {
  27. 'exchange': 'barexchange',
  28. 'exchange_type': 'topic',
  29. 'routing_key': 'b.b.#',
  30. }
  31. self.d_queue = {
  32. 'exchange': self.app.conf.task_default_exchange,
  33. 'exchange_type': self.app.conf.task_default_exchange_type,
  34. 'routing_key': self.app.conf.task_default_routing_key,
  35. }
  36. @self.app.task(shared=False)
  37. def mytask(*args, **kwargs):
  38. pass
  39. self.mytask = mytask
  40. def assert_routes_to_queue(self, queue, router, name,
  41. args=[], kwargs={}, options={}):
  42. assert router.route(options, name, args, kwargs)['queue'].name == queue
  43. def assert_routes_to_default_queue(self, router, name, *args, **kwargs):
  44. self.assert_routes_to_queue(
  45. self.app.conf.task_default_queue, router, name, *args, **kwargs)
  46. class test_MapRoute(RouteCase):
  47. def test_route_for_task_expanded_route(self):
  48. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  49. expand = E(self.app, self.app.amqp.queues)
  50. route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
  51. assert expand(route(self.mytask.name))['queue'].name == 'foo'
  52. assert route('celery.awesome') is None
  53. def test_route_for_task(self):
  54. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  55. expand = E(self.app, self.app.amqp.queues)
  56. route = routes.MapRoute({self.mytask.name: self.b_queue})
  57. eroute = expand(route(self.mytask.name))
  58. for key, value in items(self.b_queue):
  59. assert eroute[key] == value
  60. assert route('celery.awesome') is None
  61. def test_route_for_task__glob(self):
  62. route = routes.MapRoute([
  63. ('proj.tasks.*', 'routeA'),
  64. ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
  65. ])
  66. assert route('proj.tasks.foo') == {'queue': 'routeA'}
  67. assert route('demoapp.tasks.bar.moo') == {'exchange': 'routeB'}
  68. assert route('demoapp.foo.bar.moo') is None
  69. def test_expand_route_not_found(self):
  70. expand = E(self.app, self.app.amqp.Queues(
  71. self.app.conf.task_queues, False))
  72. route = routes.MapRoute({'a': {'queue': 'x'}})
  73. with pytest.raises(QueueNotFound):
  74. expand(route('a'))
  75. class test_lookup_route(RouteCase):
  76. def test_init_queues(self):
  77. router = Router(self.app, queues=None)
  78. assert router.queues == {}
  79. def test_lookup_takes_first(self):
  80. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  81. R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
  82. {self.mytask.name: {'queue': 'foo'}}))
  83. router = Router(self.app, R, self.app.amqp.queues)
  84. self.assert_routes_to_queue('bar', router, self.mytask.name)
  85. def test_expands_queue_in_options(self):
  86. set_queues(self.app)
  87. R = routes.prepare(())
  88. router = Router(
  89. self.app, R, self.app.amqp.queues, create_missing=True,
  90. )
  91. # apply_async forwards all arguments, even exchange=None etc,
  92. # so need to make sure it's merged correctly.
  93. route = router.route(
  94. {'queue': 'testq',
  95. 'exchange': None,
  96. 'routing_key': None,
  97. 'immediate': False},
  98. self.mytask.name,
  99. args=[1, 2], kwargs={},
  100. )
  101. assert route['queue'].name == 'testq'
  102. assert route['queue'].exchange == Exchange('testq')
  103. assert route['queue'].routing_key == 'testq'
  104. assert route['immediate'] is False
  105. def test_expand_destination_string(self):
  106. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  107. x = Router(self.app, {}, self.app.amqp.queues)
  108. dest = x.expand_destination('foo')
  109. assert dest['queue'].name == 'foo'
  110. def test_expand_destination__Queue(self):
  111. queue = Queue('foo')
  112. x = Router(self.app, {}, self.app.amqp.queues)
  113. dest = x.expand_destination({'queue': queue})
  114. assert dest['queue'] is queue
  115. def test_lookup_paths_traversed(self):
  116. self.simple_queue_setup()
  117. R = routes.prepare((
  118. {'celery.xaza': {'queue': 'bar'}},
  119. {self.mytask.name: {'queue': 'foo'}}
  120. ))
  121. router = Router(self.app, R, self.app.amqp.queues)
  122. self.assert_routes_to_queue('foo', router, self.mytask.name)
  123. self.assert_routes_to_default_queue(router, 'celery.poza')
  124. def test_compat_router_class(self):
  125. self.simple_queue_setup()
  126. R = routes.prepare((
  127. TestRouter(),
  128. ))
  129. router = Router(self.app, R, self.app.amqp.queues)
  130. self.assert_routes_to_queue('bar', router, 'celery.xaza')
  131. self.assert_routes_to_default_queue(router, 'celery.poza')
  132. def test_router_fun__called_with(self):
  133. self.simple_queue_setup()
  134. step = Mock(spec=['__call__'])
  135. step.return_value = None
  136. R = routes.prepare([step])
  137. router = Router(self.app, R, self.app.amqp.queues)
  138. self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
  139. step.assert_called_with(
  140. self.mytask.name, (2, 2), {'kw': 3}, ANY,
  141. task=self.mytask,
  142. )
  143. options = step.call_args[0][3]
  144. assert options['priority'] == 3
  145. def test_compat_router_classes__called_with(self):
  146. self.simple_queue_setup()
  147. step = Mock(spec=['route_for_task'])
  148. step.route_for_task.return_value = None
  149. R = routes.prepare([step])
  150. router = Router(self.app, R, self.app.amqp.queues)
  151. self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
  152. step.route_for_task.assert_called_with(
  153. self.mytask.name, (2, 2), {'kw': 3},
  154. )
  155. def simple_queue_setup(self):
  156. set_queues(
  157. self.app, foo=self.a_queue, bar=self.b_queue,
  158. **{self.app.conf.task_default_queue: self.d_queue})
  159. class TestRouter(object):
  160. def route_for_task(self, task, args, kwargs):
  161. if task == 'celery.xaza':
  162. return 'bar'
  163. class test_prepare:
  164. def test_prepare(self):
  165. o = object()
  166. R = [
  167. {'foo': 'bar'},
  168. qualname(TestRouter),
  169. o,
  170. ]
  171. p = routes.prepare(R)
  172. assert isinstance(p[0], routes.MapRoute)
  173. assert isinstance(maybe_evaluate(p[1]), TestRouter)
  174. assert p[2] is o
  175. assert routes.prepare(o) == [o]
  176. def test_prepare_item_is_dict(self):
  177. R = {'foo': 'bar'}
  178. p = routes.prepare(R)
  179. assert isinstance(p[0], routes.MapRoute)