12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- # -*- coding: utf-8 -*-
- """
- celery.routes
- ~~~~~~~~~~~~~
- Contains utilities for working with task routers,
- (:setting:`CELERY_ROUTES`).
- """
- from __future__ import absolute_import
- from celery.exceptions import QueueNotFound
- from celery.utils import lpmerge
- from celery.utils.functional import firstmethod, mpromise
- from celery.utils.imports import instantiate
- _first_route = firstmethod('route_for_task')
- class MapRoute(object):
- """Creates a router out of a :class:`dict`."""
- def __init__(self, map):
- self.map = map
- def route_for_task(self, task, *args, **kwargs):
- route = self.map.get(task)
- if route:
- return dict(route)
- class Router(object):
- def __init__(self, routes=None, queues=None, create_missing=False,
- app=None):
- self.app = app
- self.queues = {} if queues is None else queues
- self.routes = [] if routes is None else routes
- self.create_missing = create_missing
- def route(self, options, task, args=(), kwargs={}):
- options = self.expand_destination(options) # expands 'queue'
- if self.routes:
- route = self.lookup_route(task, args, kwargs)
- if route: # expands 'queue' in route.
- return lpmerge(self.expand_destination(route), options)
- if 'queue' not in options:
- options = lpmerge(self.expand_destination(
- self.app.conf.CELERY_DEFAULT_QUEUE), options)
- return options
- def expand_destination(self, route):
- # Route can be a queue name: convenient for direct exchanges.
- if isinstance(route, basestring):
- queue, route = route, {}
- else:
- # can use defaults from configured queue, but override specific
- # things (like the routing_key): great for topic exchanges.
- queue = route.pop('queue', None)
- if queue:
- try:
- Q = self.queues[queue] # noqa
- except KeyError:
- if not self.create_missing:
- raise QueueNotFound(
- 'Queue %r is not defined in CELERY_QUEUES' % queue)
- for key in 'exchange', 'routing_key':
- if route.get(key) is None:
- route[key] = queue
- Q = self.app.amqp.queues.add(queue, **route)
- # needs to be declared by publisher
- route['queue'] = Q
- return route
- def lookup_route(self, task, args=None, kwargs=None):
- return _first_route(self.routes, task, args, kwargs)
- def prepare(routes):
- """Expands the :setting:`CELERY_ROUTES` setting."""
- def expand_route(route):
- if isinstance(route, dict):
- return MapRoute(route)
- if isinstance(route, basestring):
- return mpromise(instantiate, route)
- return route
- if routes is None:
- return ()
- if not isinstance(routes, (list, tuple)):
- routes = (routes, )
- return map(expand_route, routes)
|