12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- from celery.utils import instantiate
- from celery.exceptions import RouteNotFound
- # Route from mapping
- class MapRoute(object):
- def __init__(self, map):
- self.map = map
- def route_for_task(self, task, *args, **kwargs):
- return self.map.get(task)
- def expand_destination(route, routing_table):
- if isinstance(route, basestring):
- try:
- dest = dict(routing_table[route])
- except KeyError, exc:
- raise RouteNotFound(
- "Route %s does not exist in the routing table "
- "(CELERY_QUEUES)" % route)
- dest.setdefault("routing_key", dest.get("binding_key"))
- return dest
- return route
- def prepare(routes):
- """Expand ROUTES setting."""
- def expand_route(route):
- if isinstance(route, dict):
- return MapRoute(route)
- if isinstance(route, basestring):
- return instantiate(route)
- return route
- if not hasattr(routes, "__iter__"):
- routes = (routes, )
- return map(expand_route, routes)
- def firstmatcher(method):
- """With a list of instances, find the first instance that returns a
- value for the given method."""
- def _matcher(seq, *args, **kwargs):
- for cls in seq:
- try:
- answer = getattr(cls, method)(*args, **kwargs)
- if answer is not None:
- return answer
- except AttributeError:
- pass
- return _matcher
- _first_route = firstmatcher("route_for_task")
- _first_disabled = firstmatcher("disabled")
- def lookup_route(routes, task, task_id=None, args=None, kwargs=None):
- return _first_route(routes, task, task_id, args, kwargs)
- def lookup_disabled(routes, task, task_id=None, args =None, kwargs=None):
- return _first_disabled(routes, task, task_id, args, kwargs)
|