routes.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # -*- coding: utf-8 -*-
  2. """Task Routing.
  3. Contains utilities for working with task routers, (:setting:`task_routes`).
  4. """
  5. from __future__ import absolute_import, unicode_literals
  6. import re
  7. import string
  8. from collections import Mapping, OrderedDict
  9. from kombu import Queue
  10. from celery.exceptions import QueueNotFound
  11. from celery.five import items, string_t
  12. from celery.utils.collections import lpmerge
  13. from celery.utils.functional import maybe_evaluate, mlazy
  14. from celery.utils.imports import symbol_by_name
  15. __all__ = ['MapRoute', 'Router', 'prepare']
  16. def glob_to_re(glob, quote=string.punctuation.replace('*', '')):
  17. glob = ''.join('\\' + c if c in quote else c for c in glob)
  18. return glob.replace('*', '.+?')
  19. class MapRoute(object):
  20. """Creates a router out of a :class:`dict`."""
  21. def __init__(self, map):
  22. map = items(map) if isinstance(map, Mapping) else map
  23. self.map = {}
  24. self.patterns = OrderedDict()
  25. for k, v in map:
  26. if isinstance(k, re._pattern_type):
  27. self.patterns[k] = v
  28. elif '*' in k:
  29. self.patterns[re.compile(glob_to_re(k))] = v
  30. else:
  31. self.map[k] = v
  32. def __call__(self, name, *args, **kwargs):
  33. try:
  34. return dict(self.map[name])
  35. except KeyError:
  36. pass
  37. except ValueError:
  38. return {'queue': self.map[name]}
  39. for regex, route in items(self.patterns):
  40. if regex.match(name):
  41. try:
  42. return dict(route)
  43. except ValueError:
  44. return {'queue': route}
  45. class Router(object):
  46. """Route tasks based on the :setting:`task_routes` setting."""
  47. def __init__(self, routes=None, queues=None,
  48. create_missing=False, app=None):
  49. self.app = app
  50. self.queues = {} if queues is None else queues
  51. self.routes = [] if routes is None else routes
  52. self.create_missing = create_missing
  53. def route(self, options, name, args=(), kwargs={}, task_type=None):
  54. options = self.expand_destination(options) # expands 'queue'
  55. if self.routes:
  56. route = self.lookup_route(name, args, kwargs, options, task_type)
  57. if route: # expands 'queue' in route.
  58. return lpmerge(self.expand_destination(route), options)
  59. if 'queue' not in options:
  60. options = lpmerge(self.expand_destination(
  61. self.app.conf.task_default_queue), options)
  62. return options
  63. def expand_destination(self, route):
  64. # Route can be a queue name: convenient for direct exchanges.
  65. if isinstance(route, string_t):
  66. queue, route = route, {}
  67. else:
  68. # can use defaults from configured queue, but override specific
  69. # things (like the routing_key): great for topic exchanges.
  70. queue = route.pop('queue', None)
  71. if queue:
  72. if isinstance(queue, Queue):
  73. route['queue'] = queue
  74. else:
  75. try:
  76. route['queue'] = self.queues[queue]
  77. except KeyError:
  78. raise QueueNotFound(
  79. 'Queue {0!r} missing from task_queues'.format(queue))
  80. return route
  81. def lookup_route(self, name,
  82. args=None, kwargs=None, options=None, task_type=None):
  83. query = self.query_router
  84. for router in self.routes:
  85. route = query(router, name, args, kwargs, options, task_type)
  86. if route is not None:
  87. return route
  88. def query_router(self, router, task, args, kwargs, options, task_type):
  89. router = maybe_evaluate(router)
  90. if hasattr(router, 'route_for_task'):
  91. # pre 4.0 router class
  92. return router.route_for_task(task, args, kwargs)
  93. return router(task, args, kwargs, options, task=task_type)
  94. def expand_router_string(router):
  95. router = symbol_by_name(router)
  96. if hasattr(router, 'route_for_task'):
  97. # need to instantiate pre 4.0 router classes
  98. router = router()
  99. return router
  100. def prepare(routes):
  101. """Expand the :setting:`task_routes` setting."""
  102. def expand_route(route):
  103. if isinstance(route, (Mapping, list, tuple)):
  104. return MapRoute(route)
  105. if isinstance(route, string_t):
  106. return mlazy(expand_router_string, route)
  107. return route
  108. if routes is None:
  109. return ()
  110. if not isinstance(routes, (list, tuple)):
  111. routes = (routes,)
  112. return [expand_route(route) for route in routes]