1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- from celery.exceptions import QueueNotFound
- from celery.utils import instantiate, firstmethod, mpromise
- _first_route = firstmethod("route_for_task")
- def merge(a, b):
- """Like `dict(a, **b)` except it will keep values from `a`, if the value
- in `b` is :const:`None`."""
- return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
- class MapRoute(object):
- """Creates a router out of a :class:`dict`."""
- def __init__(self, map):
- self.map = map
- def route_for_task(self, task, *args, **kwargs):
- route = self.map.get(task)
- if route:
- return dict(route)
- class Router(object):
- def __init__(self, routes=None, queues=None, create_missing=False,
- app=None):
- from celery.app import app_or_default
- if queues is None:
- queues = {}
- if routes is None:
- routes = []
- self.app = app_or_default(app)
- self.queues = queues
- self.routes = routes
- self.create_missing = create_missing
- def route(self, options, task, args=(), kwargs={}):
- # Expand "queue" keys in options.
- options = self.expand_destination(options)
- if self.routes:
- route = self.lookup_route(task, args, kwargs)
- if route:
- # Also expand "queue" keys in route.
- return merge(self.expand_destination(route), options)
- return options
- def expand_destination(self, route):
- # The route can simply be a queue name,
- # this is convenient for direct exchanges.
- if isinstance(route, basestring):
- queue, route = route, {}
- else:
- # For topic exchanges you can use the defaults from a queue
- # definition, and override e.g. just the routing_key.
- queue = route.pop("queue", None)
- if queue:
- try:
- dest = dict(self.queues[queue])
- except KeyError:
- if self.create_missing:
- dest = self.app.amqp.queues.add(queue, queue, queue)
- else:
- raise QueueNotFound(
- "Queue '%s' is not defined in CELERY_QUEUES" % queue)
- dest.setdefault("routing_key", dest.get("binding_key"))
- return merge(dest, route)
- return route
- def lookup_route(self, task, args=None, kwargs=None):
- return _first_route(self.routes, task, args, kwargs)
- def prepare(routes):
- """Expands the :setting:`CELERY_ROUTES` setting."""
- def expand_route(route):
- if isinstance(route, dict):
- return MapRoute(route)
- if isinstance(route, basestring):
- return mpromise(instantiate, route)
- return route
- if not isinstance(routes, (list, tuple)):
- routes = (routes, )
- return map(expand_route, routes)
|