from celery.exceptions import QueueNotFound from celery.utils import instantiate, firstmethod, mpromise _first_route = firstmethod("route_for_task") def merge(a, b): """Like `dict(a, **b)` except it will keep values from `a`, if the value in `b` is :const:`None`.""" return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None)) class MapRoute(object): """Creates a router out of a :class:`dict`.""" def __init__(self, map): self.map = map def route_for_task(self, task, *args, **kwargs): route = self.map.get(task) if route: return dict(route) class Router(object): def __init__(self, routes=None, queues=None, create_missing=False, app=None): from celery.app import app_or_default if queues is None: queues = {} if routes is None: routes = [] self.app = app_or_default(app) self.queues = queues self.routes = routes self.create_missing = create_missing def route(self, options, task, args=(), kwargs={}): # Expand "queue" keys in options. options = self.expand_destination(options) if self.routes: route = self.lookup_route(task, args, kwargs) if route: # Also expand "queue" keys in route. return merge(self.expand_destination(route), options) return options def expand_destination(self, route): # The route can simply be a queue name, # this is convenient for direct exchanges. if isinstance(route, basestring): queue, route = route, {} else: # For topic exchanges you can use the defaults from a queue # definition, and override e.g. just the routing_key. queue = route.pop("queue", None) if queue: try: dest = dict(self.queues[queue]) except KeyError: if self.create_missing: dest = self.app.amqp.queues.add(queue, queue, queue) else: raise QueueNotFound( "Queue '%s' is not defined in CELERY_QUEUES" % queue) dest.setdefault("routing_key", dest.get("binding_key")) return merge(dest, route) return route def lookup_route(self, task, args=None, kwargs=None): return _first_route(self.routes, task, args, kwargs) def prepare(routes): """Expands the :setting:`CELERY_ROUTES` setting.""" def expand_route(route): if isinstance(route, dict): return MapRoute(route) if isinstance(route, basestring): return mpromise(instantiate, route) return route if not isinstance(routes, (list, tuple)): routes = (routes, ) return map(expand_route, routes)