routes.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.routes
  4. ~~~~~~~~~~~~~
  5. Contains utilities for working with task routers,
  6. (:setting:`CELERY_ROUTES`).
  7. """
  8. from __future__ import absolute_import
  9. from celery.exceptions import QueueNotFound
  10. from celery.utils import lpmerge
  11. from celery.utils.functional import firstmethod, mpromise
  12. from celery.utils.imports import instantiate
  13. _first_route = firstmethod('route_for_task')
  14. class MapRoute(object):
  15. """Creates a router out of a :class:`dict`."""
  16. def __init__(self, map):
  17. self.map = map
  18. def route_for_task(self, task, *args, **kwargs):
  19. route = self.map.get(task)
  20. if route:
  21. return dict(route)
  22. class Router(object):
  23. def __init__(self, routes=None, queues=None, create_missing=False,
  24. app=None):
  25. self.app = app
  26. self.queues = {} if queues is None else queues
  27. self.routes = [] if routes is None else routes
  28. self.create_missing = create_missing
  29. def route(self, options, task, args=(), kwargs={}):
  30. options = self.expand_destination(options) # expands 'queue'
  31. if self.routes:
  32. route = self.lookup_route(task, args, kwargs)
  33. if route: # expands 'queue' in route.
  34. return lpmerge(self.expand_destination(route), options)
  35. if 'queue' not in options:
  36. options = lpmerge(self.expand_destination(
  37. self.app.conf.CELERY_DEFAULT_QUEUE), options)
  38. return options
  39. def expand_destination(self, route):
  40. # Route can be a queue name: convenient for direct exchanges.
  41. if isinstance(route, basestring):
  42. queue, route = route, {}
  43. else:
  44. # can use defaults from configured queue, but override specific
  45. # things (like the routing_key): great for topic exchanges.
  46. queue = route.pop('queue', None)
  47. if queue:
  48. try:
  49. Q = self.queues[queue] # noqa
  50. except KeyError:
  51. if not self.create_missing:
  52. raise QueueNotFound(
  53. 'Queue %r is not defined in CELERY_QUEUES' % queue)
  54. for key in 'exchange', 'routing_key':
  55. if route.get(key) is None:
  56. route[key] = queue
  57. Q = self.app.amqp.queues.add(queue, **route)
  58. # needs to be declared by publisher
  59. route['queue'] = Q
  60. return route
  61. def lookup_route(self, task, args=None, kwargs=None):
  62. return _first_route(self.routes, task, args, kwargs)
  63. def prepare(routes):
  64. """Expands the :setting:`CELERY_ROUTES` setting."""
  65. def expand_route(route):
  66. if isinstance(route, dict):
  67. return MapRoute(route)
  68. if isinstance(route, basestring):
  69. return mpromise(instantiate, route)
  70. return route
  71. if routes is None:
  72. return ()
  73. if not isinstance(routes, (list, tuple)):
  74. routes = (routes, )
  75. return map(expand_route, routes)