routes.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. from celery.exceptions import QueueNotFound
  2. from celery.utils import instantiate, firstmethod, mpromise
  3. _first_route = firstmethod("route_for_task")
  4. class MapRoute(object):
  5. """Makes a router out of a :class:`dict`."""
  6. def __init__(self, map):
  7. self.map = map
  8. def route_for_task(self, task, *args, **kwargs):
  9. route = self.map.get(task)
  10. if route:
  11. return dict(route)
  12. class Router(object):
  13. def __init__(self, routes=None, queues=None, create_missing=False):
  14. if queues is None:
  15. queues = {}
  16. if routes is None:
  17. routes = []
  18. self.queues = queues
  19. self.routes = routes
  20. self.create_missing = create_missing
  21. def add_queue(self, queue):
  22. q = self.queues[queue] = {"binding_key": queue,
  23. "routing_key": queue,
  24. "exchange": queue,
  25. "exchange_type": "direct"}
  26. return q
  27. def route(self, options, task, args=(), kwargs={}):
  28. # Expand "queue" keys in options.
  29. options = self.expand_destination(options)
  30. if self.routes:
  31. route = self.lookup_route(task, args, kwargs)
  32. if route:
  33. # Also expand "queue" keys in route.
  34. return dict(options, **self.expand_destination(route))
  35. return options
  36. def expand_destination(self, route):
  37. # The route can simply be a queue name,
  38. # this is convenient for direct exchanges.
  39. if isinstance(route, basestring):
  40. queue, route = route, {}
  41. else:
  42. # For topic exchanges you can use the defaults from a queue
  43. # definition, and override e.g. just the routing_key.
  44. queue = route.pop("queue", None)
  45. if queue:
  46. try:
  47. dest = dict(self.queues[queue])
  48. except KeyError:
  49. if self.create_missing:
  50. dest = self.add_queue(queue)
  51. else:
  52. raise QueueNotFound(
  53. "Queue '%s' is not defined in CELERY_QUEUES" % queue)
  54. dest.setdefault("routing_key", dest.get("binding_key"))
  55. return dict(dest, **route)
  56. return route
  57. def lookup_route(self, task, args=None, kwargs=None):
  58. return _first_route(self.routes, task, args, kwargs)
  59. def prepare(routes):
  60. """Expand ROUTES setting."""
  61. def expand_route(route):
  62. if isinstance(route, dict):
  63. return MapRoute(route)
  64. if isinstance(route, basestring):
  65. return mpromise(instantiate, route)
  66. return route
  67. if not isinstance(routes, (list, tuple)):
  68. routes = (routes, )
  69. return map(expand_route, routes)