routes.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from __future__ import absolute_import
  2. from celery.exceptions import QueueNotFound
  3. from celery.utils import firstmethod, instantiate, lpmerge, mpromise
  4. _first_route = firstmethod("route_for_task")
  5. class MapRoute(object):
  6. """Creates a router out of a :class:`dict`."""
  7. def __init__(self, map):
  8. self.map = map
  9. def route_for_task(self, task, *args, **kwargs):
  10. route = self.map.get(task)
  11. if route:
  12. return dict(route)
  13. class Router(object):
  14. def __init__(self, routes=None, queues=None, create_missing=False,
  15. app=None):
  16. from celery.app import app_or_default
  17. self.app = app_or_default(app)
  18. self.queues = {} if queues is None else queues
  19. self.routes = [] if routes is None else routes
  20. self.create_missing = create_missing
  21. def route(self, options, task, args=(), kwargs={}):
  22. options = self.expand_destination(options) # expands 'queue'
  23. if self.routes:
  24. route = self.lookup_route(task, args, kwargs)
  25. if route: # expands 'queue' in route.
  26. return lpmerge(self.expand_destination(route), options)
  27. if "queue" not in options:
  28. options = lpmerge(self.expand_destination(
  29. self.app.conf.CELERY_DEFAULT_QUEUE), options)
  30. return options
  31. def expand_destination(self, route):
  32. # Route can be a queue name: convenient for direct exchanges.
  33. if isinstance(route, basestring):
  34. queue, route = route, {}
  35. else:
  36. # can use defaults from configured queue, but override specific
  37. # things (like the routing_key): great for topic exchanges.
  38. queue = route.pop("queue", None)
  39. if queue: # expand config from configured queue.
  40. try:
  41. dest = dict(self.queues[queue])
  42. except KeyError:
  43. if not self.create_missing:
  44. raise QueueNotFound(
  45. "Queue %r is not defined in CELERY_QUEUES" % queue)
  46. dest = dict(self.app.amqp.queues.add(queue, queue, queue))
  47. # needs to be declared by publisher
  48. dest["queue"] = queue
  49. # routing_key and binding_key are synonyms.
  50. dest.setdefault("routing_key", dest.get("binding_key"))
  51. return lpmerge(dest, route)
  52. return route
  53. def lookup_route(self, task, args=None, kwargs=None):
  54. return _first_route(self.routes, task, args, kwargs)
  55. def prepare(routes):
  56. """Expands the :setting:`CELERY_ROUTES` setting."""
  57. def expand_route(route):
  58. if isinstance(route, dict):
  59. return MapRoute(route)
  60. if isinstance(route, basestring):
  61. return mpromise(instantiate, route)
  62. return route
  63. if routes is None:
  64. return ()
  65. if not isinstance(routes, (list, tuple)):
  66. routes = (routes, )
  67. return map(expand_route, routes)