test_routes.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import unittest2 as unittest
  2. from celery import conf
  3. from celery import routes
  4. from celery.utils import gen_unique_id
  5. from celery.utils.functional import wraps
  6. from celery.exceptions import RouteNotFound
  7. def E(routing_table):
  8. def expand(answer):
  9. return routes.expand_destination(answer, routing_table)
  10. return expand
  11. def with_queues(**queues):
  12. def patch_fun(fun):
  13. @wraps(fun)
  14. def __inner(*args, **kwargs):
  15. prev_queues = conf.QUEUES
  16. conf.QUEUES = queues
  17. try:
  18. return fun(*args, **kwargs)
  19. finally:
  20. conf.QUEUES = prev_queues
  21. return __inner
  22. return patch_fun
  23. a_route = {"exchange": "fooexchange",
  24. "exchange_type": "fanout",
  25. "binding_key": "xuzzy"}
  26. b_route = {"exchange": "barexchange",
  27. "exchange_type": "topic",
  28. "binding_key": "b.b.#"}
  29. class test_MapRoute(unittest.TestCase):
  30. @with_queues(foo=a_route, bar=b_route)
  31. def test_route_for_task_expanded_route(self):
  32. expand = E(conf.QUEUES)
  33. route = routes.MapRoute({"celery.ping": "foo"})
  34. self.assertDictContainsSubset(a_route,
  35. expand(route.route_for_task("celery.ping")))
  36. self.assertIsNone(route.route_for_task("celery.awesome"))
  37. @with_queues(foo=a_route, bar=b_route)
  38. def test_route_for_task(self):
  39. expand = E(conf.QUEUES)
  40. route = routes.MapRoute({"celery.ping": b_route})
  41. self.assertDictContainsSubset(b_route,
  42. expand(route.route_for_task("celery.ping")))
  43. self.assertIsNone(route.route_for_task("celery.awesome"))
  44. def test_expand_route_not_found(self):
  45. expand = E(conf.QUEUES)
  46. route = routes.MapRoute({"a": "x"})
  47. self.assertRaises(RouteNotFound, expand, route.route_for_task("a"))
  48. class test_lookup_route(unittest.TestCase):
  49. @with_queues(foo=a_route, bar=b_route)
  50. def test_lookup_takes_first(self):
  51. expand = E(conf.QUEUES)
  52. R = routes.prepare(({"celery.ping": "bar"},
  53. {"celery.ping": "foo"}))
  54. self.assertDictContainsSubset(b_route,
  55. expand(routes.lookup_route(R, "celery.ping", gen_unique_id(),
  56. args=[1, 2], kwargs={})))
  57. @with_queues(foo=a_route, bar=b_route)
  58. def test_lookup_paths_traversed(self):
  59. expand = E(conf.QUEUES)
  60. R = routes.prepare(({"celery.xaza": "bar"},
  61. {"celery.ping": "foo"}))
  62. self.assertDictContainsSubset(a_route,
  63. expand(routes.lookup_route(R, "celery.ping", gen_unique_id(),
  64. args=[1, 2], kwargs={})))
  65. self.assertIsNone(routes.lookup_route(R, "celery.poza"))
  66. class test_lookup_disabled(unittest.TestCase):
  67. def test_disabled(self):
  68. def create_router(name, is_disabled):
  69. class _Router(object):
  70. def disabled(self, task, *args):
  71. if task == name:
  72. return is_disabled
  73. return _Router()
  74. A = create_router("celery.ping", True)
  75. B = create_router("celery.ping", False)
  76. C = object()
  77. R1 = (routes.prepare((A, B, C)), True)
  78. R2 = (routes.prepare((B, C, A)), False)
  79. R3 = (routes.prepare((C, A, B)), True)
  80. R4 = (routes.prepare((B, A, C)), False)
  81. R5 = (routes.prepare((A, C, B)), True)
  82. R6 = (routes.prepare((C, B, A)), False)
  83. for i, (router, state) in enumerate((R1, R2, R3, R4, R5, R6)):
  84. self.assertEqual(routes.lookup_disabled(router, "celery.ping"),
  85. state, "ok %d" % i)