test_routes.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import unittest2 as unittest
  2. from celery import conf
  3. from celery import routes
  4. from celery.utils.functional import wraps
  5. from celery.exceptions import QueueNotFound
  6. def E(queues):
  7. def expand(answer):
  8. return routes.Router([], queues).expand_destination(answer)
  9. return expand
  10. def with_queues(**queues):
  11. def patch_fun(fun):
  12. @wraps(fun)
  13. def __inner(*args, **kwargs):
  14. prev_queues = conf.QUEUES
  15. conf.QUEUES = queues
  16. try:
  17. return fun(*args, **kwargs)
  18. finally:
  19. conf.QUEUES = prev_queues
  20. return __inner
  21. return patch_fun
  22. a_queue = {"exchange": "fooexchange",
  23. "exchange_type": "fanout",
  24. "binding_key": "xuzzy"}
  25. b_queue = {"exchange": "barexchange",
  26. "exchange_type": "topic",
  27. "binding_key": "b.b.#"}
  28. class test_MapRoute(unittest.TestCase):
  29. @with_queues(foo=a_queue, bar=b_queue)
  30. def test_route_for_task_expanded_route(self):
  31. expand = E(conf.QUEUES)
  32. route = routes.MapRoute({"celery.ping": "foo"})
  33. self.assertDictContainsSubset(a_queue,
  34. expand(route.route_for_task("celery.ping")))
  35. self.assertIsNone(route.route_for_task("celery.awesome"))
  36. @with_queues(foo=a_queue, bar=b_queue)
  37. def test_route_for_task(self):
  38. expand = E(conf.QUEUES)
  39. route = routes.MapRoute({"celery.ping": b_queue})
  40. self.assertDictContainsSubset(b_queue,
  41. expand(route.route_for_task("celery.ping")))
  42. self.assertIsNone(route.route_for_task("celery.awesome"))
  43. def test_expand_route_not_found(self):
  44. expand = E(conf.QUEUES)
  45. route = routes.MapRoute({"a": "x"})
  46. self.assertRaises(QueueNotFound, expand, route.route_for_task("a"))
  47. class test_lookup_route(unittest.TestCase):
  48. def test_init_queues(self):
  49. router = routes.Router(queues=None)
  50. self.assertDictEqual(router.queues, {})
  51. @with_queues(foo=a_queue, bar=b_queue)
  52. def test_lookup_takes_first(self):
  53. R = routes.prepare(({"celery.ping": "bar"},
  54. {"celery.ping": "foo"}))
  55. router = routes.Router(R, conf.QUEUES)
  56. self.assertDictContainsSubset(b_queue,
  57. router.route({}, "celery.ping",
  58. args=[1, 2], kwargs={}))
  59. @with_queues(foo=a_queue, bar=b_queue)
  60. def test_lookup_paths_traversed(self):
  61. R = routes.prepare(({"celery.xaza": "bar"},
  62. {"celery.ping": "foo"}))
  63. router = routes.Router(R, conf.QUEUES)
  64. self.assertDictContainsSubset(a_queue,
  65. router.route({}, "celery.ping",
  66. args=[1, 2], kwargs={}))
  67. self.assertEqual(router.route({}, "celery.poza"), {})
  68. class test_prepare(unittest.TestCase):
  69. def test_prepare(self):
  70. from celery.datastructures import LocalCache
  71. o = object()
  72. R = [{"foo": "bar"},
  73. "celery.datastructures.LocalCache",
  74. o]
  75. p = routes.prepare(R)
  76. self.assertIsInstance(p[0], routes.MapRoute)
  77. self.assertIsInstance(p[1], LocalCache)
  78. self.assertIs(p[2], o)
  79. self.assertEqual(routes.prepare(o), [o])