test_routes.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from functools import wraps
  4. from celery import routes
  5. from celery import current_app
  6. from celery.exceptions import QueueNotFound
  7. from celery.task import task
  8. from celery.utils import maybe_promise
  9. from celery.tests.utils import Case
  10. @task
  11. def mytask():
  12. pass
  13. def E(queues):
  14. def expand(answer):
  15. return routes.Router([], queues).expand_destination(answer)
  16. return expand
  17. def with_queues(**queues):
  18. def patch_fun(fun):
  19. @wraps(fun)
  20. def __inner(*args, **kwargs):
  21. app = current_app
  22. prev_queues = app.conf.CELERY_QUEUES
  23. prev_Queues = app.amqp.queues
  24. app.conf.CELERY_QUEUES = queues
  25. app.amqp.queues = app.amqp.Queues(queues)
  26. try:
  27. return fun(*args, **kwargs)
  28. finally:
  29. app.conf.CELERY_QUEUES = prev_queues
  30. app.amqp.queues = prev_Queues
  31. return __inner
  32. return patch_fun
  33. a_queue = {"exchange": "fooexchange",
  34. "exchange_type": "fanout",
  35. "binding_key": "xuzzy"}
  36. b_queue = {"exchange": "barexchange",
  37. "exchange_type": "topic",
  38. "binding_key": "b.b.#"}
  39. d_queue = {"exchange": current_app.conf.CELERY_DEFAULT_EXCHANGE,
  40. "exchange_type": current_app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
  41. "routing_key": current_app.conf.CELERY_DEFAULT_ROUTING_KEY}
  42. class test_MapRoute(Case):
  43. @with_queues(foo=a_queue, bar=b_queue)
  44. def test_route_for_task_expanded_route(self):
  45. expand = E(current_app.conf.CELERY_QUEUES)
  46. route = routes.MapRoute({mytask.name: {"queue": "foo"}})
  47. self.assertDictContainsSubset(a_queue,
  48. expand(route.route_for_task(mytask.name)))
  49. self.assertIsNone(route.route_for_task("celery.awesome"))
  50. @with_queues(foo=a_queue, bar=b_queue)
  51. def test_route_for_task(self):
  52. expand = E(current_app.conf.CELERY_QUEUES)
  53. route = routes.MapRoute({mytask.name: b_queue})
  54. self.assertDictContainsSubset(b_queue,
  55. expand(route.route_for_task(mytask.name)))
  56. self.assertIsNone(route.route_for_task("celery.awesome"))
  57. def test_expand_route_not_found(self):
  58. expand = E(current_app.conf.CELERY_QUEUES)
  59. route = routes.MapRoute({"a": {"queue": "x"}})
  60. with self.assertRaises(QueueNotFound):
  61. expand(route.route_for_task("a"))
  62. class test_lookup_route(Case):
  63. def test_init_queues(self):
  64. router = routes.Router(queues=None)
  65. self.assertDictEqual(router.queues, {})
  66. @with_queues(foo=a_queue, bar=b_queue)
  67. def test_lookup_takes_first(self):
  68. R = routes.prepare(({mytask.name: {"queue": "bar"}},
  69. {mytask.name: {"queue": "foo"}}))
  70. router = routes.Router(R, current_app.conf.CELERY_QUEUES)
  71. self.assertDictContainsSubset(b_queue,
  72. router.route({}, mytask.name,
  73. args=[1, 2], kwargs={}))
  74. @with_queues()
  75. def test_expands_queue_in_options(self):
  76. R = routes.prepare(())
  77. router = routes.Router(R, current_app.conf.CELERY_QUEUES,
  78. create_missing=True)
  79. # apply_async forwards all arguments, even exchange=None etc,
  80. # so need to make sure it's merged correctly.
  81. route = router.route({"queue": "testq",
  82. "exchange": None,
  83. "routing_key": None,
  84. "immediate": False},
  85. mytask.name,
  86. args=[1, 2], kwargs={})
  87. self.assertDictContainsSubset({"exchange": "testq",
  88. "routing_key": "testq",
  89. "immediate": False},
  90. route)
  91. self.assertIn("queue", route)
  92. @with_queues(foo=a_queue, bar=b_queue)
  93. def test_expand_destaintion_string(self):
  94. x = routes.Router({}, current_app.conf.CELERY_QUEUES)
  95. dest = x.expand_destination("foo")
  96. self.assertEqual(dest["exchange"], "fooexchange")
  97. @with_queues(foo=a_queue, bar=b_queue, **{
  98. current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
  99. def test_lookup_paths_traversed(self):
  100. R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
  101. {mytask.name: {"queue": "foo"}}))
  102. router = routes.Router(R, current_app.amqp.queues)
  103. self.assertDictContainsSubset(a_queue,
  104. router.route({}, mytask.name,
  105. args=[1, 2], kwargs={}))
  106. self.assertEqual(router.route({}, "celery.poza"),
  107. dict(d_queue, queue=current_app.conf.CELERY_DEFAULT_QUEUE))
  108. class test_prepare(Case):
  109. def test_prepare(self):
  110. from celery.datastructures import LRUCache
  111. o = object()
  112. R = [{"foo": "bar"},
  113. "celery.datastructures.LRUCache",
  114. o]
  115. p = routes.prepare(R)
  116. self.assertIsInstance(p[0], routes.MapRoute)
  117. self.assertIsInstance(maybe_promise(p[1]), LRUCache)
  118. self.assertIs(p[2], o)
  119. self.assertEqual(routes.prepare(o), [o])
  120. def test_prepare_item_is_dict(self):
  121. R = {"foo": "bar"}
  122. p = routes.prepare(R)
  123. self.assertIsInstance(p[0], routes.MapRoute)