test_routes.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import unittest2 as unittest
  2. from celery import conf
  3. from celery import routes
  4. from celery.utils.functional import wraps
  5. from celery.exceptions import QueueNotFound
  6. def E(queues):
  7. def expand(answer):
  8. return routes.Router([], queues).expand_destination(answer)
  9. return expand
  10. def with_queues(**queues):
  11. def patch_fun(fun):
  12. @wraps(fun)
  13. def __inner(*args, **kwargs):
  14. prev_queues = conf.QUEUES
  15. conf.QUEUES = queues
  16. try:
  17. return fun(*args, **kwargs)
  18. finally:
  19. conf.QUEUES = prev_queues
  20. return __inner
  21. return patch_fun
  22. a_queue = {"exchange": "fooexchange",
  23. "exchange_type": "fanout",
  24. "binding_key": "xuzzy"}
  25. b_queue = {"exchange": "barexchange",
  26. "exchange_type": "topic",
  27. "binding_key": "b.b.#"}
  28. class test_MapRoute(unittest.TestCase):
  29. @with_queues(foo=a_queue, bar=b_queue)
  30. def test_route_for_task_expanded_route(self):
  31. expand = E(conf.QUEUES)
  32. route = routes.MapRoute({"celery.ping": "foo"})
  33. self.assertDictContainsSubset(a_queue,
  34. expand(route.route_for_task("celery.ping")))
  35. self.assertIsNone(route.route_for_task("celery.awesome"))
  36. @with_queues(foo=a_queue, bar=b_queue)
  37. def test_route_for_task(self):
  38. expand = E(conf.QUEUES)
  39. route = routes.MapRoute({"celery.ping": b_queue})
  40. self.assertDictContainsSubset(b_queue,
  41. expand(route.route_for_task("celery.ping")))
  42. self.assertIsNone(route.route_for_task("celery.awesome"))
  43. def test_expand_route_not_found(self):
  44. expand = E(conf.QUEUES)
  45. route = routes.MapRoute({"a": "x"})
  46. self.assertRaises(QueueNotFound, expand, route.route_for_task("a"))
  47. class test_lookup_route(unittest.TestCase):
  48. @with_queues(foo=a_queue, bar=b_queue)
  49. def test_lookup_takes_first(self):
  50. R = routes.prepare(({"celery.ping": "bar"},
  51. {"celery.ping": "foo"}))
  52. router = routes.Router(R, conf.QUEUES)
  53. self.assertDictContainsSubset(b_queue,
  54. router.route({}, "celery.ping",
  55. args=[1, 2], kwargs={}))
  56. @with_queues(foo=a_queue, bar=b_queue)
  57. def test_lookup_paths_traversed(self):
  58. R = routes.prepare(({"celery.xaza": "bar"},
  59. {"celery.ping": "foo"}))
  60. router = routes.Router(R, conf.QUEUES)
  61. self.assertDictContainsSubset(a_queue,
  62. router.route({}, "celery.ping",
  63. args=[1, 2], kwargs={}))
  64. self.assertEqual(router.route({}, "celery.poza"), {})