routes.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from celery.utils import instantiate
  2. from celery.exceptions import RouteNotFound
  3. # Route from mapping
  4. class MapRoute(object):
  5. def __init__(self, map):
  6. self.map = map
  7. def route_for_task(self, task, *args, **kwargs):
  8. return self.map.get(task)
  9. def expand_destination(route, routing_table):
  10. if isinstance(route, basestring):
  11. try:
  12. dest = dict(routing_table[route])
  13. except KeyError, exc:
  14. raise RouteNotFound(
  15. "Route %s does not exist in the routing table "
  16. "(CELERY_QUEUES)" % route)
  17. dest.setdefault("routing_key", dest.get("binding_key"))
  18. return dest
  19. return route
  20. def prepare(routes):
  21. """Expand ROUTES setting."""
  22. def expand_route(route):
  23. if isinstance(route, dict):
  24. return MapRoute(route)
  25. if isinstance(route, basestring):
  26. return instantiate(route)
  27. return route
  28. if not hasattr(routes, "__iter__"):
  29. routes = (routes, )
  30. return map(expand_route, routes)
  31. def firstmatcher(method):
  32. """With a list of instances, find the first instance that returns a
  33. value for the given method."""
  34. def _matcher(seq, *args, **kwargs):
  35. for cls in seq:
  36. try:
  37. answer = getattr(cls, method)(*args, **kwargs)
  38. if answer is not None:
  39. return answer
  40. except AttributeError:
  41. pass
  42. return _matcher
  43. _first_route = firstmatcher("route_for_task")
  44. _first_disabled = firstmatcher("disabled")
  45. def lookup_route(routes, task, task_id=None, args=None, kwargs=None):
  46. return _first_route(routes, task, task_id, args, kwargs)
  47. def lookup_disabled(routes, task, task_id=None, args =None, kwargs=None):
  48. return _first_disabled(routes, task, task_id, args, kwargs)