routes.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from celery.exceptions import QueueNotFound
  2. from celery.utils import instantiate, firstmethod, mpromise
  3. _first_route = firstmethod("route_for_task")
  4. def merge(a, b):
  5. """Like ``dict(a, **b)`` except it will keep values from ``a``,
  6. if the value in ``b`` is :const:`None`."""
  7. return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
  8. class MapRoute(object):
  9. """Makes a router out of a :class:`dict`."""
  10. def __init__(self, map):
  11. self.map = map
  12. def route_for_task(self, task, *args, **kwargs):
  13. route = self.map.get(task)
  14. if route:
  15. return dict(route)
  16. class Router(object):
  17. def __init__(self, routes=None, queues=None, create_missing=False):
  18. if queues is None:
  19. queues = {}
  20. if routes is None:
  21. routes = []
  22. self.queues = queues
  23. self.routes = routes
  24. self.create_missing = create_missing
  25. def add_queue(self, queue):
  26. q = self.queues[queue] = {"binding_key": queue,
  27. "routing_key": queue,
  28. "exchange": queue,
  29. "exchange_type": "direct"}
  30. return q
  31. def route(self, options, task, args=(), kwargs={}):
  32. # Expand "queue" keys in options.
  33. options = self.expand_destination(options)
  34. if self.routes:
  35. route = self.lookup_route(task, args, kwargs)
  36. if route:
  37. # Also expand "queue" keys in route.
  38. return merge(options, self.expand_destination(route))
  39. return options
  40. def expand_destination(self, route):
  41. # The route can simply be a queue name,
  42. # this is convenient for direct exchanges.
  43. if isinstance(route, basestring):
  44. queue, route = route, {}
  45. else:
  46. # For topic exchanges you can use the defaults from a queue
  47. # definition, and override e.g. just the routing_key.
  48. queue = route.pop("queue", None)
  49. if queue:
  50. try:
  51. dest = dict(self.queues[queue])
  52. except KeyError:
  53. if self.create_missing:
  54. dest = self.add_queue(queue)
  55. else:
  56. raise QueueNotFound(
  57. "Queue '%s' is not defined in CELERY_QUEUES" % queue)
  58. dest.setdefault("routing_key", dest.get("binding_key"))
  59. return merge(dest, route)
  60. return route
  61. def lookup_route(self, task, args=None, kwargs=None):
  62. return _first_route(self.routes, task, args, kwargs)
  63. def prepare(routes):
  64. """Expand ROUTES setting."""
  65. def expand_route(route):
  66. if isinstance(route, dict):
  67. return MapRoute(route)
  68. if isinstance(route, basestring):
  69. return mpromise(instantiate, route)
  70. return route
  71. if not isinstance(routes, (list, tuple)):
  72. routes = (routes, )
  73. return map(expand_route, routes)