routes.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.routes
  4. ~~~~~~~~~~~~~
  5. Contains utilities for working with task routes
  6. (:setting:`CELERY_ROUTES`).
  7. :copyright: (c) 2009 - 2011 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. from .exceptions import QueueNotFound
  12. from .utils import firstmethod, instantiate, lpmerge, mpromise
  13. _first_route = firstmethod("route_for_task")
  14. class MapRoute(object):
  15. """Creates a router out of a :class:`dict`."""
  16. def __init__(self, map):
  17. self.map = map
  18. def route_for_task(self, task, *args, **kwargs):
  19. route = self.map.get(task)
  20. if route:
  21. return dict(route)
  22. class Router(object):
  23. def __init__(self, routes=None, queues=None, create_missing=False,
  24. app=None):
  25. from .app import app_or_default
  26. self.app = app_or_default(app)
  27. self.queues = {} if queues is None else queues
  28. self.routes = [] if routes is None else routes
  29. self.create_missing = create_missing
  30. def route(self, options, task, args=(), kwargs={}):
  31. options = self.expand_destination(options) # expands 'queue'
  32. if self.routes:
  33. route = self.lookup_route(task, args, kwargs)
  34. if route: # expands 'queue' in route.
  35. return lpmerge(self.expand_destination(route), options)
  36. if "queue" not in options:
  37. options = lpmerge(self.expand_destination(
  38. self.app.conf.CELERY_DEFAULT_QUEUE), options)
  39. return options
  40. def expand_destination(self, route):
  41. # Route can be a queue name: convenient for direct exchanges.
  42. if isinstance(route, basestring):
  43. queue, route = route, {}
  44. else:
  45. # can use defaults from configured queue, but override specific
  46. # things (like the routing_key): great for topic exchanges.
  47. queue = route.pop("queue", None)
  48. if queue: # expand config from configured queue.
  49. try:
  50. dest = dict(self.queues[queue])
  51. except KeyError:
  52. if not self.create_missing:
  53. raise QueueNotFound(
  54. "Queue %r is not defined in CELERY_QUEUES" % queue)
  55. dest = dict(self.app.amqp.queues.add(queue, queue, queue))
  56. # needs to be declared by publisher
  57. dest["queue"] = queue
  58. # routing_key and binding_key are synonyms.
  59. dest.setdefault("routing_key", dest.get("binding_key"))
  60. return lpmerge(dest, route)
  61. return route
  62. def lookup_route(self, task, args=None, kwargs=None):
  63. return _first_route(self.routes, task, args, kwargs)
  64. def prepare(routes):
  65. """Expands the :setting:`CELERY_ROUTES` setting."""
  66. def expand_route(route):
  67. if isinstance(route, dict):
  68. return MapRoute(route)
  69. if isinstance(route, basestring):
  70. return mpromise(instantiate, route)
  71. return route
  72. if routes is None:
  73. return ()
  74. if not isinstance(routes, (list, tuple)):
  75. routes = (routes, )
  76. return map(expand_route, routes)