123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import unittest2 as unittest
- from celery import conf
- from celery import routes
- from celery.utils import gen_unique_id
- from celery.utils.functional import wraps
- from celery.exceptions import RouteNotFound
- def E(routing_table):
- def expand(answer):
- return routes.expand_destination(answer, routing_table)
- return expand
- def with_queues(**queues):
- def patch_fun(fun):
- @wraps(fun)
- def __inner(*args, **kwargs):
- prev_queues = conf.QUEUES
- conf.QUEUES = queues
- try:
- return fun(*args, **kwargs)
- finally:
- conf.QUEUES = prev_queues
- return __inner
- return patch_fun
- a_route = {"exchange": "fooexchange",
- "exchange_type": "fanout",
- "binding_key": "xuzzy"}
- b_route = {"exchange": "barexchange",
- "exchange_type": "topic",
- "binding_key": "b.b.#"}
- class test_MapRoute(unittest.TestCase):
- @with_queues(foo=a_route, bar=b_route)
- def test_route_for_task_expanded_route(self):
- expand = E(conf.QUEUES)
- route = routes.MapRoute({"celery.ping": "foo"})
- self.assertDictContainsSubset(a_route,
- expand(route.route_for_task("celery.ping")))
- self.assertIsNone(route.route_for_task("celery.awesome"))
- @with_queues(foo=a_route, bar=b_route)
- def test_route_for_task(self):
- expand = E(conf.QUEUES)
- route = routes.MapRoute({"celery.ping": b_route})
- self.assertDictContainsSubset(b_route,
- expand(route.route_for_task("celery.ping")))
- self.assertIsNone(route.route_for_task("celery.awesome"))
- def test_expand_route_not_found(self):
- expand = E(conf.QUEUES)
- route = routes.MapRoute({"a": "x"})
- self.assertRaises(RouteNotFound, expand, route.route_for_task("a"))
- class test_lookup_route(unittest.TestCase):
- @with_queues(foo=a_route, bar=b_route)
- def test_lookup_takes_first(self):
- expand = E(conf.QUEUES)
- R = routes.prepare(({"celery.ping": "bar"},
- {"celery.ping": "foo"}))
- self.assertDictContainsSubset(b_route,
- expand(routes.lookup_route(R, "celery.ping", gen_unique_id(),
- args=[1, 2], kwargs={})))
- @with_queues(foo=a_route, bar=b_route)
- def test_lookup_paths_traversed(self):
- expand = E(conf.QUEUES)
- R = routes.prepare(({"celery.xaza": "bar"},
- {"celery.ping": "foo"}))
- self.assertDictContainsSubset(a_route,
- expand(routes.lookup_route(R, "celery.ping", gen_unique_id(),
- args=[1, 2], kwargs={})))
- self.assertIsNone(routes.lookup_route(R, "celery.poza"))
- class test_lookup_disabled(unittest.TestCase):
- def test_disabled(self):
- def create_router(name, is_disabled):
- class _Router(object):
- def disabled(self, task, *args):
- if task == name:
- return is_disabled
- return _Router()
- A = create_router("celery.ping", True)
- B = create_router("celery.ping", False)
- C = object()
- R1 = (routes.prepare((A, B, C)), True)
- R2 = (routes.prepare((B, C, A)), False)
- R3 = (routes.prepare((C, A, B)), True)
- R4 = (routes.prepare((B, A, C)), False)
- R5 = (routes.prepare((A, C, B)), True)
- R6 = (routes.prepare((C, B, A)), False)
- for i, (router, state) in enumerate((R1, R2, R3, R4, R5, R6)):
- self.assertEqual(routes.lookup_disabled(router, "celery.ping"),
- state, "ok %d" % i)
|