routes.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from celery.exceptions import QueueNotFound
  2. from celery.utils import instantiate, firstmethod
  3. _first_route = firstmethod("route_for_task")
  4. class MapRoute(object):
  5. """Makes a router out of a :class:`dict`."""
  6. def __init__(self, map):
  7. self.map = map
  8. def route_for_task(self, task, *args, **kwargs):
  9. return self.map.get(task)
  10. class Router(object):
  11. def __init__(self, routes=None, queues=None, create_missing=False):
  12. if queues is None:
  13. queues = {}
  14. if routes is None:
  15. routes = []
  16. self.queues = queues
  17. self.routes = routes
  18. self.create_missing = create_missing
  19. def add_queue(self, queue):
  20. q = self.queues[queue] = {"binding_key": queue,
  21. "routing_key": queue,
  22. "exchange": queue,
  23. "exchange_type": "direct"}
  24. return q
  25. def route(self, options, task, args=(), kwargs={}):
  26. # Expand "queue" keys in options.
  27. options = self.expand_destination(options)
  28. if self.routes:
  29. route = self.lookup_route(task, args, kwargs)
  30. if route:
  31. # Also expand "queue" keys in route.
  32. return dict(options, **self.expand_destination(route))
  33. return options
  34. def expand_destination(self, route):
  35. # The route can simply be a queue name,
  36. # this is convenient for direct exchanges.
  37. if isinstance(route, basestring):
  38. queue, route = route, {}
  39. else:
  40. # For topic exchanges you can use the defaults from a queue
  41. # definition, and override e.g. just the routing_key.
  42. queue = route.pop("queue", None)
  43. if queue:
  44. try:
  45. dest = dict(self.queues[queue])
  46. except KeyError:
  47. if self.create_missing:
  48. dest = self.add_queue(queue)
  49. else:
  50. raise QueueNotFound(
  51. "Queue '%s' is not defined in CELERY_QUEUES" % queue)
  52. dest.setdefault("routing_key", dest.get("binding_key"))
  53. return dict(route, **dest)
  54. return route
  55. def lookup_route(self, task, args=None, kwargs=None):
  56. return _first_route(self.routes, task, args, kwargs)
  57. def prepare(routes):
  58. """Expand ROUTES setting."""
  59. def expand_route(route):
  60. if isinstance(route, dict):
  61. return MapRoute(route)
  62. if isinstance(route, basestring):
  63. return instantiate(route)
  64. return route
  65. if not hasattr(routes, "__iter__"):
  66. routes = (routes, )
  67. return map(expand_route, routes)