routes.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. from celery.exceptions import QueueNotFound
  2. from celery.utils import instantiate, firstmethod, mpromise
  3. _first_route = firstmethod("route_for_task")
  4. def merge(a, b):
  5. """Like `dict(a, **b)` except it will keep values from `a`, if the value
  6. in `b` is :const:`None`."""
  7. return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
  8. class MapRoute(object):
  9. """Creates a router out of a :class:`dict`."""
  10. def __init__(self, map):
  11. self.map = map
  12. def route_for_task(self, task, *args, **kwargs):
  13. route = self.map.get(task)
  14. if route:
  15. return dict(route)
  16. class Router(object):
  17. def __init__(self, routes=None, queues=None, create_missing=False,
  18. app=None):
  19. from celery.app import app_or_default
  20. if queues is None:
  21. queues = {}
  22. if routes is None:
  23. routes = []
  24. self.app = app_or_default(app)
  25. self.queues = queues
  26. self.routes = routes
  27. self.create_missing = create_missing
  28. def route(self, options, task, args=(), kwargs={}):
  29. # Expand "queue" keys in options.
  30. options = self.expand_destination(options)
  31. if self.routes:
  32. route = self.lookup_route(task, args, kwargs)
  33. if route:
  34. # Also expand "queue" keys in route.
  35. return merge(self.expand_destination(route), options)
  36. return options
  37. def expand_destination(self, route):
  38. # The route can simply be a queue name,
  39. # this is convenient for direct exchanges.
  40. if isinstance(route, basestring):
  41. queue, route = route, {}
  42. else:
  43. # For topic exchanges you can use the defaults from a queue
  44. # definition, and override e.g. just the routing_key.
  45. queue = route.pop("queue", None)
  46. if queue:
  47. try:
  48. dest = dict(self.queues[queue])
  49. except KeyError:
  50. if self.create_missing:
  51. dest = self.app.amqp.queues.add(queue, queue, queue)
  52. else:
  53. raise QueueNotFound(
  54. "Queue '%s' is not defined in CELERY_QUEUES" % queue)
  55. dest.setdefault("routing_key", dest.get("binding_key"))
  56. return merge(dest, route)
  57. return route
  58. def lookup_route(self, task, args=None, kwargs=None):
  59. return _first_route(self.routes, task, args, kwargs)
  60. def prepare(routes):
  61. """Expands the :setting:`CELERY_ROUTES` setting."""
  62. def expand_route(route):
  63. if isinstance(route, dict):
  64. return MapRoute(route)
  65. if isinstance(route, basestring):
  66. return mpromise(instantiate, route)
  67. return route
  68. if not isinstance(routes, (list, tuple)):
  69. routes = (routes, )
  70. return map(expand_route, routes)