routes.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from celery.exceptions import QueueNotFound
  2. from celery.utils import firstmethod, instantiate, lpmerge, mpromise
  3. _first_route = firstmethod("route_for_task")
  4. class MapRoute(object):
  5. """Creates a router out of a :class:`dict`."""
  6. def __init__(self, map):
  7. self.map = map
  8. def route_for_task(self, task, *args, **kwargs):
  9. route = self.map.get(task)
  10. if route:
  11. return dict(route)
  12. class Router(object):
  13. def __init__(self, routes=None, queues=None, create_missing=False,
  14. app=None):
  15. from celery.app import app_or_default
  16. self.app = app_or_default(app)
  17. if queues is None:
  18. queues = {}
  19. if routes is None:
  20. routes = []
  21. self.queues = queues
  22. self.routes = routes
  23. self.create_missing = create_missing
  24. def route(self, options, task, args=(), kwargs={}):
  25. options = self.expand_destination(options) # expands 'queue'
  26. if self.routes:
  27. route = self.lookup_route(task, args, kwargs)
  28. if route: # expands 'queue' in route.
  29. return lpmerge(self.expand_destination(route), options)
  30. return options
  31. def expand_destination(self, route):
  32. # Route can be a queue name: convenient for direct exchanges.
  33. if isinstance(route, basestring):
  34. queue, route = route, {}
  35. else:
  36. # can use defaults from configured queue, but override specific
  37. # things (like the routing_key): great for topic exchanges.
  38. queue = route.pop("queue", None)
  39. if queue: # expand config from configured queue.
  40. try:
  41. dest = dict(self.queues[queue])
  42. except KeyError:
  43. if not self.create_missing:
  44. raise QueueNotFound(
  45. "Queue %r is not defined in CELERY_QUEUES" % queue)
  46. dest = dict(self.app.amqp.queues.add(queue, queue, queue))
  47. # needs to be declared by publisher
  48. dest["queue"] = queue
  49. # routing_key and binding_key are synonyms.
  50. dest.setdefault("routing_key", dest.get("binding_key"))
  51. return lpmerge(dest, route)
  52. return route
  53. def lookup_route(self, task, args=None, kwargs=None):
  54. return _first_route(self.routes, task, args, kwargs)
  55. def prepare(routes):
  56. """Expands the :setting:`CELERY_ROUTES` setting."""
  57. def expand_route(route):
  58. if isinstance(route, dict):
  59. return MapRoute(route)
  60. if isinstance(route, basestring):
  61. return mpromise(instantiate, route)
  62. return route
  63. if routes is None:
  64. return ()
  65. if not isinstance(routes, (list, tuple)):
  66. routes = (routes, )
  67. return map(expand_route, routes)