routes.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.routes
  4. ~~~~~~~~~~~~~
  5. Contains utilities for working with task routers,
  6. (:setting:`task_routes`).
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. import re
  10. import string
  11. from collections import Mapping, OrderedDict
  12. from kombu import Queue
  13. from celery.exceptions import QueueNotFound
  14. from celery.five import items, string_t
  15. from celery.utils.collections import lpmerge
  16. from celery.utils.functional import firstmethod, fun_takes_argument, mlazy
  17. from celery.utils.imports import instantiate
  18. __all__ = ['MapRoute', 'Router', 'prepare']
  19. def _try_route(meth, task, args, kwargs, options=None):
  20. if fun_takes_argument('options', meth, position=4):
  21. return meth(task, args, kwargs, options)
  22. return meth(task, args, kwargs)
  23. _first_route = firstmethod('route_for_task')
  24. def glob_to_re(glob, quote=string.punctuation.replace('*', '')):
  25. glob = ''.join('\\' + c if c in quote else c for c in glob)
  26. return glob.replace('*', '.+?')
  27. class MapRoute(object):
  28. """Creates a router out of a :class:`dict`."""
  29. def __init__(self, map):
  30. map = items(map) if isinstance(map, Mapping) else map
  31. self.map = {}
  32. self.patterns = OrderedDict()
  33. for k, v in map:
  34. if isinstance(k, re._pattern_type):
  35. self.patterns[k] = v
  36. elif '*' in k:
  37. self.patterns[re.compile(glob_to_re(k))] = v
  38. else:
  39. self.map[k] = v
  40. def route_for_task(self, task, *args, **kwargs):
  41. try:
  42. return dict(self.map[task])
  43. except KeyError:
  44. pass
  45. except ValueError:
  46. return {'queue': self.map[task]}
  47. for regex, route in items(self.patterns):
  48. if regex.match(task):
  49. try:
  50. return dict(route)
  51. except ValueError:
  52. return {'queue': route}
  53. class Router(object):
  54. def __init__(self, routes=None, queues=None,
  55. create_missing=False, app=None):
  56. self.app = app
  57. self.queues = {} if queues is None else queues
  58. self.routes = [] if routes is None else routes
  59. self.create_missing = create_missing
  60. def route(self, options, task, args=(), kwargs={}):
  61. options = self.expand_destination(options) # expands 'queue'
  62. if self.routes:
  63. route = self.lookup_route(task, args, kwargs, options)
  64. if route: # expands 'queue' in route.
  65. return lpmerge(self.expand_destination(route), options)
  66. if 'queue' not in options:
  67. options = lpmerge(self.expand_destination(
  68. self.app.conf.task_default_queue), options)
  69. return options
  70. def expand_destination(self, route):
  71. # Route can be a queue name: convenient for direct exchanges.
  72. if isinstance(route, string_t):
  73. queue, route = route, {}
  74. else:
  75. # can use defaults from configured queue, but override specific
  76. # things (like the routing_key): great for topic exchanges.
  77. queue = route.pop('queue', None)
  78. if queue:
  79. if isinstance(queue, Queue):
  80. route['queue'] = queue
  81. else:
  82. try:
  83. route['queue'] = self.queues[queue]
  84. except KeyError:
  85. raise QueueNotFound(
  86. 'Queue {0!r} missing from task_queues'.format(queue))
  87. return route
  88. def lookup_route(self, task, args=None, kwargs=None, options=None):
  89. return _first_route(self.routes, task, args, kwargs, options)
  90. def prepare(routes):
  91. """Expands the :setting:`task_routes` setting."""
  92. def expand_route(route):
  93. if isinstance(route, (Mapping, list, tuple)):
  94. return MapRoute(route)
  95. if isinstance(route, string_t):
  96. return mlazy(instantiate, route)
  97. return route
  98. if routes is None:
  99. return ()
  100. if not isinstance(routes, (list, tuple)):
  101. routes = (routes,)
  102. return [expand_route(route) for route in routes]