123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- from celery.exceptions import QueueNotFound
- from celery.utils import instantiate, firstmethod, mpromise
- _first_route = firstmethod("route_for_task")
- class MapRoute(object):
- """Makes a router out of a :class:`dict`."""
- def __init__(self, map):
- self.map = map
- def route_for_task(self, task, *args, **kwargs):
- route = self.map.get(task)
- if route:
- return dict(route)
- class Router(object):
- def __init__(self, routes=None, queues=None, create_missing=False):
- if queues is None:
- queues = {}
- if routes is None:
- routes = []
- self.queues = queues
- self.routes = routes
- self.create_missing = create_missing
- def add_queue(self, queue):
- q = self.queues[queue] = {"binding_key": queue,
- "routing_key": queue,
- "exchange": queue,
- "exchange_type": "direct"}
- return q
- def route(self, options, task, args=(), kwargs={}):
- # Expand "queue" keys in options.
- options = self.expand_destination(options)
- if self.routes:
- route = self.lookup_route(task, args, kwargs)
- if route:
- # Also expand "queue" keys in route.
- return dict(options, **self.expand_destination(route))
- return options
- def expand_destination(self, route):
- # The route can simply be a queue name,
- # this is convenient for direct exchanges.
- if isinstance(route, basestring):
- queue, route = route, {}
- else:
- # For topic exchanges you can use the defaults from a queue
- # definition, and override e.g. just the routing_key.
- queue = route.pop("queue", None)
- if queue:
- try:
- dest = dict(self.queues[queue])
- except KeyError:
- if self.create_missing:
- dest = self.add_queue(queue)
- else:
- raise QueueNotFound(
- "Queue '%s' is not defined in CELERY_QUEUES" % queue)
- dest.setdefault("routing_key", dest.get("binding_key"))
- return dict(dest, **route)
- return route
- def lookup_route(self, task, args=None, kwargs=None):
- return _first_route(self.routes, task, args, kwargs)
- def prepare(routes):
- """Expand ROUTES setting."""
- def expand_route(route):
- if isinstance(route, dict):
- return MapRoute(route)
- if isinstance(route, basestring):
- return mpromise(instantiate, route)
- return route
- if not isinstance(routes, (list, tuple)):
- routes = (routes, )
- return map(expand_route, routes)
|