test_routes.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import ANY, Mock
  4. from kombu import Exchange, Queue
  5. from kombu.utils.functional import maybe_evaluate
  6. from celery.app import routes
  7. from celery.exceptions import QueueNotFound
  8. from celery.five import items
  9. from celery.utils.imports import qualname
  10. def Router(app, *args, **kwargs):
  11. return routes.Router(*args, app=app, **kwargs)
  12. def E(app, queues):
  13. def expand(answer):
  14. return Router(app, [], queues).expand_destination(answer)
  15. return expand
  16. def set_queues(app, **queues):
  17. app.conf.task_queues = queues
  18. app.amqp.queues = app.amqp.Queues(queues)
  19. class RouteCase:
  20. def setup(self):
  21. self.a_queue = {
  22. 'exchange': 'fooexchange',
  23. 'exchange_type': 'fanout',
  24. 'routing_key': 'xuzzy',
  25. }
  26. self.b_queue = {
  27. 'exchange': 'barexchange',
  28. 'exchange_type': 'topic',
  29. 'routing_key': 'b.b.#',
  30. }
  31. self.d_queue = {
  32. 'exchange': self.app.conf.task_default_exchange,
  33. 'exchange_type': self.app.conf.task_default_exchange_type,
  34. 'routing_key': self.app.conf.task_default_routing_key,
  35. }
  36. @self.app.task(shared=False)
  37. def mytask(*args, **kwargs):
  38. pass
  39. self.mytask = mytask
  40. def assert_routes_to_queue(self, queue, router, name,
  41. args=[], kwargs={}, options={}):
  42. assert router.route(options, name, args, kwargs)['queue'].name == queue
  43. def assert_routes_to_default_queue(self, router, name, *args, **kwargs):
  44. self.assert_routes_to_queue(
  45. self.app.conf.task_default_queue, router, name, *args, **kwargs)
  46. class test_MapRoute(RouteCase):
  47. def test_route_for_task_expanded_route(self):
  48. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  49. expand = E(self.app, self.app.amqp.queues)
  50. route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
  51. assert expand(route(self.mytask.name))['queue'].name == 'foo'
  52. assert route('celery.awesome') is None
  53. def test_route_for_task(self):
  54. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  55. expand = E(self.app, self.app.amqp.queues)
  56. route = routes.MapRoute({self.mytask.name: self.b_queue})
  57. eroute = expand(route(self.mytask.name))
  58. for key, value in items(self.b_queue):
  59. assert eroute[key] == value
  60. assert route('celery.awesome') is None
  61. def test_route_for_task__glob(self):
  62. from re import compile
  63. route = routes.MapRoute([
  64. ('proj.tasks.*', 'routeA'),
  65. ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
  66. (compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
  67. ])
  68. assert route('proj.tasks.foo') == {'queue': 'routeA'}
  69. assert route('demoapp.tasks.bar.moo') == {'exchange': 'routeB'}
  70. assert route('video.tasks.foo') == {'queue': 'media'}
  71. assert route('image.tasks.foo') == {'queue': 'media'}
  72. assert route('demoapp.foo.bar.moo') is None
  73. def test_expand_route_not_found(self):
  74. expand = E(self.app, self.app.amqp.Queues(
  75. self.app.conf.task_queues, False))
  76. route = routes.MapRoute({'a': {'queue': 'x'}})
  77. with pytest.raises(QueueNotFound):
  78. expand(route('a'))
  79. class test_lookup_route(RouteCase):
  80. def test_init_queues(self):
  81. router = Router(self.app, queues=None)
  82. assert router.queues == {}
  83. def test_lookup_takes_first(self):
  84. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  85. R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
  86. {self.mytask.name: {'queue': 'foo'}}))
  87. router = Router(self.app, R, self.app.amqp.queues)
  88. self.assert_routes_to_queue('bar', router, self.mytask.name)
  89. def test_expands_queue_in_options(self):
  90. set_queues(self.app)
  91. R = routes.prepare(())
  92. router = Router(
  93. self.app, R, self.app.amqp.queues, create_missing=True,
  94. )
  95. # apply_async forwards all arguments, even exchange=None etc,
  96. # so need to make sure it's merged correctly.
  97. route = router.route(
  98. {'queue': 'testq',
  99. 'exchange': None,
  100. 'routing_key': None,
  101. 'immediate': False},
  102. self.mytask.name,
  103. args=[1, 2], kwargs={},
  104. )
  105. assert route['queue'].name == 'testq'
  106. assert route['queue'].exchange == Exchange('testq')
  107. assert route['queue'].routing_key == 'testq'
  108. assert route['immediate'] is False
  109. def test_expand_destination_string(self):
  110. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  111. x = Router(self.app, {}, self.app.amqp.queues)
  112. dest = x.expand_destination('foo')
  113. assert dest['queue'].name == 'foo'
  114. def test_expand_destination__Queue(self):
  115. queue = Queue('foo')
  116. x = Router(self.app, {}, self.app.amqp.queues)
  117. dest = x.expand_destination({'queue': queue})
  118. assert dest['queue'] is queue
  119. def test_lookup_paths_traversed(self):
  120. self.simple_queue_setup()
  121. R = routes.prepare((
  122. {'celery.xaza': {'queue': 'bar'}},
  123. {self.mytask.name: {'queue': 'foo'}}
  124. ))
  125. router = Router(self.app, R, self.app.amqp.queues)
  126. self.assert_routes_to_queue('foo', router, self.mytask.name)
  127. self.assert_routes_to_default_queue(router, 'celery.poza')
  128. def test_compat_router_class(self):
  129. self.simple_queue_setup()
  130. R = routes.prepare((
  131. TestRouter(),
  132. ))
  133. router = Router(self.app, R, self.app.amqp.queues)
  134. self.assert_routes_to_queue('bar', router, 'celery.xaza')
  135. self.assert_routes_to_default_queue(router, 'celery.poza')
  136. def test_router_fun__called_with(self):
  137. self.simple_queue_setup()
  138. step = Mock(spec=['__call__'])
  139. step.return_value = None
  140. R = routes.prepare([step])
  141. router = Router(self.app, R, self.app.amqp.queues)
  142. self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
  143. step.assert_called_with(
  144. self.mytask.name, (2, 2), {'kw': 3}, ANY,
  145. task=self.mytask,
  146. )
  147. options = step.call_args[0][3]
  148. assert options['priority'] == 3
  149. def test_compat_router_classes__called_with(self):
  150. self.simple_queue_setup()
  151. step = Mock(spec=['route_for_task'])
  152. step.route_for_task.return_value = None
  153. R = routes.prepare([step])
  154. router = Router(self.app, R, self.app.amqp.queues)
  155. self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
  156. step.route_for_task.assert_called_with(
  157. self.mytask.name, (2, 2), {'kw': 3},
  158. )
  159. def simple_queue_setup(self):
  160. set_queues(
  161. self.app, foo=self.a_queue, bar=self.b_queue,
  162. **{self.app.conf.task_default_queue: self.d_queue})
  163. class TestRouter(object):
  164. def route_for_task(self, task, args, kwargs):
  165. if task == 'celery.xaza':
  166. return 'bar'
  167. class test_prepare:
  168. def test_prepare(self):
  169. o = object()
  170. R = [
  171. {'foo': 'bar'},
  172. qualname(TestRouter),
  173. o,
  174. ]
  175. p = routes.prepare(R)
  176. assert isinstance(p[0], routes.MapRoute)
  177. assert isinstance(maybe_evaluate(p[1]), TestRouter)
  178. assert p[2] is o
  179. assert routes.prepare(o) == [o]
  180. def test_prepare_item_is_dict(self):
  181. R = {'foo': 'bar'}
  182. p = routes.prepare(R)
  183. assert isinstance(p[0], routes.MapRoute)