123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- # -*- coding: utf-8 -*-
- """
- celery.routes
- ~~~~~~~~~~~~~
- Contains utilities for working with task routers,
- (:setting:`CELERY_ROUTES`).
- """
- from __future__ import absolute_import
- from celery.exceptions import QueueNotFound
- from celery.five import string_t
- from celery.utils import lpmerge
- from celery.utils.functional import firstmethod, mlazy
- from celery.utils.imports import instantiate
- __all__ = ['MapRoute', 'Router', 'prepare']
- _first_route = firstmethod('route_for_task')
- class MapRoute(object):
- """Creates a router out of a :class:`dict`."""
- def __init__(self, map):
- self.map = map
- def route_for_task(self, task, *args, **kwargs):
- try:
- return dict(self.map[task])
- except KeyError:
- pass
- class Router(object):
- def __init__(self, routes=None, queues=None,
- create_missing=False, app=None):
- self.app = app
- self.queues = {} if queues is None else queues
- self.routes = [] if routes is None else routes
- self.create_missing = create_missing
- def route(self, options, task, args=(), kwargs={}):
- options = self.expand_destination(options) # expands 'queue'
- if self.routes:
- route = self.lookup_route(task, args, kwargs)
- if route: # expands 'queue' in route.
- return lpmerge(self.expand_destination(route), options)
- if 'queue' not in options:
- options = lpmerge(self.expand_destination(
- self.app.conf.CELERY_DEFAULT_QUEUE), options)
- return options
- def expand_destination(self, route):
- # Route can be a queue name: convenient for direct exchanges.
- if isinstance(route, string_t):
- queue, route = route, {}
- else:
- # can use defaults from configured queue, but override specific
- # things (like the routing_key): great for topic exchanges.
- queue = route.pop('queue', None)
- if queue:
- try:
- Q = self.queues[queue] # noqa
- except KeyError:
- raise QueueNotFound(
- 'Queue {0!r} missing from CELERY_QUEUES'.format(queue))
- # needs to be declared by publisher
- route['queue'] = Q
- return route
- def lookup_route(self, task, args=None, kwargs=None):
- return _first_route(self.routes, task, args, kwargs)
- def prepare(routes):
- """Expands the :setting:`CELERY_ROUTES` setting."""
- def expand_route(route):
- if isinstance(route, dict):
- return MapRoute(route)
- if isinstance(route, string_t):
- return mlazy(instantiate, route)
- return route
- if routes is None:
- return ()
- if not isinstance(routes, (list, tuple)):
- routes = (routes, )
- return [expand_route(route) for route in routes]
|