routes.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from celery.exceptions import QueueNotFound
  2. from celery.utils import firstmethod, instantiate, lpmerge, mpromise
  3. _first_route = firstmethod("route_for_task")
  4. class MapRoute(object):
  5. """Creates a router out of a :class:`dict`."""
  6. def __init__(self, map):
  7. self.map = map
  8. def route_for_task(self, task, *args, **kwargs):
  9. route = self.map.get(task)
  10. if route:
  11. return dict(route)
  12. class Router(object):
  13. def __init__(self, routes=None, queues=None, create_missing=False,
  14. app=None):
  15. from celery.app import app_or_default
  16. self.app = app_or_default(app)
  17. if queues is None:
  18. queues = {}
  19. if routes is None:
  20. routes = []
  21. self.queues = queues
  22. self.routes = routes
  23. self.create_missing = create_missing
  24. def route(self, options, task, args=(), kwargs={}):
  25. options = self.expand_destination(options) # expands 'queue'
  26. if self.routes:
  27. route = self.lookup_route(task, args, kwargs)
  28. if route: # expands 'queue' in route.
  29. return lpmerge(self.expand_destination(route), options)
  30. if "queue" not in options:
  31. options = lpmerge(self.expand_destination(
  32. self.app.conf.CELERY_DEFAULT_QUEUE), options)
  33. return options
  34. def expand_destination(self, route):
  35. # Route can be a queue name: convenient for direct exchanges.
  36. if isinstance(route, basestring):
  37. queue, route = route, {}
  38. else:
  39. # can use defaults from configured queue, but override specific
  40. # things (like the routing_key): great for topic exchanges.
  41. queue = route.pop("queue", None)
  42. if queue: # expand config from configured queue.
  43. try:
  44. dest = dict(self.queues[queue])
  45. except KeyError:
  46. if not self.create_missing:
  47. raise QueueNotFound(
  48. "Queue %r is not defined in CELERY_QUEUES" % queue)
  49. dest = dict(self.app.amqp.queues.add(queue, queue, queue))
  50. # needs to be declared by publisher
  51. dest["queue"] = queue
  52. # routing_key and binding_key are synonyms.
  53. dest.setdefault("routing_key", dest.get("binding_key"))
  54. return lpmerge(dest, route)
  55. return route
  56. def lookup_route(self, task, args=None, kwargs=None):
  57. return _first_route(self.routes, task, args, kwargs)
  58. def prepare(routes):
  59. """Expands the :setting:`CELERY_ROUTES` setting."""
  60. def expand_route(route):
  61. if isinstance(route, dict):
  62. return MapRoute(route)
  63. if isinstance(route, basestring):
  64. return mpromise(instantiate, route)
  65. return route
  66. if routes is None:
  67. return ()
  68. if not isinstance(routes, (list, tuple)):
  69. routes = (routes, )
  70. return map(expand_route, routes)