test_routes.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from __future__ import absolute_import
  2. from functools import wraps
  3. from kombu import Exchange
  4. from kombu.utils.functional import maybe_promise
  5. from celery import current_app
  6. from celery.app import routes
  7. from celery.exceptions import QueueNotFound
  8. from celery.task import task
  9. from celery.tests.utils import Case
  10. def Router(*args, **kwargs):
  11. return routes.Router(*args, app=current_app, **kwargs)
  12. @task()
  13. def mytask():
  14. pass
  15. def E(queues):
  16. def expand(answer):
  17. return Router([], queues).expand_destination(answer)
  18. return expand
  19. def with_queues(**queues):
  20. def patch_fun(fun):
  21. @wraps(fun)
  22. def __inner(*args, **kwargs):
  23. app = current_app
  24. prev_queues = app.conf.CELERY_QUEUES
  25. prev_Queues = app.amqp.queues
  26. app.conf.CELERY_QUEUES = queues
  27. app.amqp.queues = app.amqp.Queues(queues)
  28. try:
  29. return fun(*args, **kwargs)
  30. finally:
  31. app.conf.CELERY_QUEUES = prev_queues
  32. app.amqp.queues = prev_Queues
  33. return __inner
  34. return patch_fun
  35. a_queue = {'exchange': 'fooexchange',
  36. 'exchange_type': 'fanout',
  37. 'routing_key': 'xuzzy'}
  38. b_queue = {'exchange': 'barexchange',
  39. 'exchange_type': 'topic',
  40. 'routing_key': 'b.b.#'}
  41. d_queue = {'exchange': current_app.conf.CELERY_DEFAULT_EXCHANGE,
  42. 'exchange_type': current_app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
  43. 'routing_key': current_app.conf.CELERY_DEFAULT_ROUTING_KEY}
  44. class RouteCase(Case):
  45. pass
  46. class test_MapRoute(RouteCase):
  47. @with_queues(foo=a_queue, bar=b_queue)
  48. def test_route_for_task_expanded_route(self):
  49. expand = E(current_app.amqp.queues)
  50. route = routes.MapRoute({mytask.name: {'queue': 'foo'}})
  51. self.assertEqual(
  52. expand(route.route_for_task(mytask.name))['queue'].name,
  53. 'foo',
  54. )
  55. self.assertIsNone(route.route_for_task('celery.awesome'))
  56. @with_queues(foo=a_queue, bar=b_queue)
  57. def test_route_for_task(self):
  58. expand = E(current_app.amqp.queues)
  59. route = routes.MapRoute({mytask.name: b_queue})
  60. self.assertDictContainsSubset(b_queue,
  61. expand(route.route_for_task(mytask.name)))
  62. self.assertIsNone(route.route_for_task('celery.awesome'))
  63. def test_expand_route_not_found(self):
  64. expand = E(current_app.amqp.Queues(
  65. current_app.conf.CELERY_QUEUES, False))
  66. route = routes.MapRoute({'a': {'queue': 'x'}})
  67. with self.assertRaises(QueueNotFound):
  68. expand(route.route_for_task('a'))
  69. class test_lookup_route(RouteCase):
  70. def test_init_queues(self):
  71. router = Router(queues=None)
  72. self.assertDictEqual(router.queues, {})
  73. @with_queues(foo=a_queue, bar=b_queue)
  74. def test_lookup_takes_first(self):
  75. R = routes.prepare(({mytask.name: {'queue': 'bar'}},
  76. {mytask.name: {'queue': 'foo'}}))
  77. router = Router(R, current_app.amqp.queues)
  78. self.assertEqual(router.route({}, mytask.name,
  79. args=[1, 2], kwargs={})['queue'].name, 'bar')
  80. @with_queues()
  81. def test_expands_queue_in_options(self):
  82. R = routes.prepare(())
  83. router = Router(R, current_app.amqp.queues, create_missing=True)
  84. # apply_async forwards all arguments, even exchange=None etc,
  85. # so need to make sure it's merged correctly.
  86. route = router.route({'queue': 'testq',
  87. 'exchange': None,
  88. 'routing_key': None,
  89. 'immediate': False},
  90. mytask.name,
  91. args=[1, 2], kwargs={})
  92. self.assertEqual(route['queue'].name, 'testq')
  93. self.assertEqual(route['queue'].exchange, Exchange('testq'))
  94. self.assertEqual(route['queue'].routing_key, 'testq')
  95. self.assertEqual(route['immediate'], False)
  96. @with_queues(foo=a_queue, bar=b_queue)
  97. def test_expand_destination_string(self):
  98. x = Router({}, current_app.amqp.queues)
  99. dest = x.expand_destination('foo')
  100. self.assertEqual(dest['queue'].name, 'foo')
  101. @with_queues(foo=a_queue, bar=b_queue, **{
  102. current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
  103. def test_lookup_paths_traversed(self):
  104. R = routes.prepare(({'celery.xaza': {'queue': 'bar'}},
  105. {mytask.name: {'queue': 'foo'}}))
  106. router = Router(R, current_app.amqp.queues)
  107. self.assertEqual(router.route({}, mytask.name,
  108. args=[1, 2], kwargs={})['queue'].name, 'foo')
  109. self.assertEqual(router.route({}, 'celery.poza')['queue'].name,
  110. current_app.conf.CELERY_DEFAULT_QUEUE)
  111. class test_prepare(Case):
  112. def test_prepare(self):
  113. from celery.datastructures import LRUCache
  114. o = object()
  115. R = [{'foo': 'bar'},
  116. 'celery.datastructures.LRUCache',
  117. o]
  118. p = routes.prepare(R)
  119. self.assertIsInstance(p[0], routes.MapRoute)
  120. self.assertIsInstance(maybe_promise(p[1]), LRUCache)
  121. self.assertIs(p[2], o)
  122. self.assertEqual(routes.prepare(o), [o])
  123. def test_prepare_item_is_dict(self):
  124. R = {'foo': 'bar'}
  125. p = routes.prepare(R)
  126. self.assertIsInstance(p[0], routes.MapRoute)