Browse Source

task_routes entries can now be glob patterns or even regular expressions. Closes #1137

Ask Solem 9 years ago
parent
commit
02f95470a7
5 changed files with 112 additions and 38 deletions
  1. 28 3
      celery/app/routes.py
  2. 17 0
      celery/tests/app/test_routes.py
  3. 39 34
      docs/configuration.rst
  4. 22 1
      docs/userguide/routing.rst
  5. 6 0
      docs/whatsnew-4.0.rst

+ 28 - 3
celery/app/routes.py

@@ -9,10 +9,15 @@
 """
 from __future__ import absolute_import
 
+import re
+import string
+
+from collections import Mapping, OrderedDict
+
 from kombu import Queue
 
 from celery.exceptions import QueueNotFound
-from celery.five import string_t
+from celery.five import items, string_t
 from celery.utils import lpmerge
 from celery.utils.functional import firstmethod, mlazy
 from celery.utils.imports import instantiate
@@ -22,11 +27,25 @@ __all__ = ['MapRoute', 'Router', 'prepare']
 _first_route = firstmethod('route_for_task')
 
 
+def glob_to_re(glob, quote=string.punctuation.replace('*', '')):
+    glob = ''.join('\\' + c if c in quote else c for c in glob)
+    return glob.replace('*', '.+?')
+
+
 class MapRoute(object):
     """Creates a router out of a :class:`dict`."""
 
     def __init__(self, map):
-        self.map = map
+        map = items(map) if isinstance(map, Mapping) else map
+        self.map = {}
+        self.patterns = OrderedDict()
+        for k, v in map:
+            if isinstance(k, re._pattern_type):
+                self.patterns[k] = v
+            elif '*' in k:
+                self.patterns[re.compile(glob_to_re(k))] = v
+            else:
+                self.map[k] = v
 
     def route_for_task(self, task, *args, **kwargs):
         try:
@@ -35,6 +54,12 @@ class MapRoute(object):
             pass
         except ValueError:
             return {'queue': self.map[task]}
+        for regex, route in items(self.patterns):
+            if regex.match(task):
+                try:
+                    return dict(route)
+                except ValueError:
+                    return {'queue': route}
 
 
 class Router(object):
@@ -85,7 +110,7 @@ def prepare(routes):
     """Expands the :setting:`task_routes` setting."""
 
     def expand_route(route):
-        if isinstance(route, dict):
+        if isinstance(route, (Mapping, list, tuple)):
             return MapRoute(route)
         if isinstance(route, string_t):
             return mlazy(instantiate, route)

+ 17 - 0
celery/tests/app/test_routes.py

@@ -72,6 +72,23 @@ class test_MapRoute(RouteCase):
         )
         self.assertIsNone(route.route_for_task('celery.awesome'))
 
+    def test_route_for_task__glob(self):
+        route = routes.MapRoute([
+            ('proj.tasks.*', 'routeA'),
+            ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
+        ])
+        self.assertDictEqual(
+            route.route_for_task('proj.tasks.foo'),
+            {'queue': 'routeA'},
+        )
+        self.assertDictEqual(
+            route.route_for_task('demoapp.tasks.bar.moo'),
+            {'exchange': 'routeB'},
+        )
+        self.assertIsNone(
+            route.route_for_task('demoapp.foo.bar.moo'),
+        )
+
     def test_expand_route_not_found(self):
         expand = E(self.app, self.app.amqp.Queues(
                    self.app.conf.task_queues, False))

+ 39 - 34
docs/configuration.rst

@@ -128,7 +128,7 @@ rush in moving to the new settings format.
 ``CELERY_DEFAULT_QUEUE``               :setting:`task_default_queue`
 ``CELERY_DEFAULT_RATE_LIMIT``          :setting:`task_default_rate_limit`
 ``CELERY_DEFAULT_ROUTING_KEY``         :setting:`task_default_routing_key`
-``-"-_EAGER_PROPAGATES_EXCEPTIONS``    :setting:`task_eager_propagates`
+``-'-_EAGER_PROPAGATES_EXCEPTIONS``    :setting:`task_eager_propagates`
 ``CELERY_IGNORE_RESULT``               :setting:`task_ignore_result`
 ``CELERY_TASK_PUBLISH_RETRY``          :setting:`task_publish_retry`
 ``CELERY_TASK_PUBLISH_RETRY_POLICY``   :setting:`task_publish_retry_policy`
@@ -389,10 +389,10 @@ If set, the worker stores all task errors in the result store even if
 task_track_started
 ~~~~~~~~~~~~~~~~~~
 
-If :const:`True` the task will report its status as "started" when the
+If :const:`True` the task will report its status as 'started' when the
 task is executed by a worker.  The default value is :const:`False` as
 the normal behaviour is to not report that level of granularity.  Tasks
-are either pending, finished, or waiting to be retried.  Having a "started"
+are either pending, finished, or waiting to be retried.  Having a 'started'
 state can be useful for when there are long running tasks and there is a
 need to report which task is currently running.
 
@@ -599,7 +599,7 @@ Default is to expire after 1 day.
 result_cache_max
 ~~~~~~~~~~~~~~~~
 
-Enables client caching of results, which can be useful for the old "amqp"
+Enables client caching of results, which can be useful for the old 'amqp'
 backend where the result is unavailable as soon as one result instance
 consumes it.
 
@@ -1041,21 +1041,21 @@ Riak backend settings
 This backend requires the :setting:`result_backend`
 setting to be set to a Riak URL::
 
-    result_backend = "riak://host:port/bucket"
+    result_backend = 'riak://host:port/bucket'
 
 For example::
 
-    result_backend = "riak://localhost/celery
+    result_backend = 'riak://localhost/celery
 
 which is the same as::
 
-    result_backend = "riak://"
+    result_backend = 'riak://'
 
 The fields of the URL are defined as follows:
 
 - *host*
 
-Host name or IP address of the Riak server. e.g. `"localhost"`.
+Host name or IP address of the Riak server. e.g. `'localhost'`.
 
 - *port*
 
@@ -1307,25 +1307,30 @@ in order.
 
 A router can be specified as either:
 
-*  A router class instances
+*  A router class instance.
 *  A string which provides the path to a router class
-*  A dict containing router specification. It will be converted to a :class:`celery.routes.MapRoute` instance.
+*  A dict containing router specification:
+     Will be converted to a :class:`celery.routes.MapRoute` instance.
+* A list of ``(pattern, route)`` tuples:
+     Will be converted to a :class:`celery.routes.MapRoute` instance.
 
 Examples:
 
 .. code-block:: python
 
     task_routes = {
-        "celery.ping": "default",
-        "mytasks.add": "cpu-bound",
-        "video.encode": {
-            "queue": "video",
-            "exchange": "media"
-            "routing_key": "media.video.encode",
+        'celery.ping': 'default',
+        'mytasks.add': 'cpu-bound',
+        'feed.tasks.*': 'feeds',                           # <-- glob pattern
+        re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
+        'video.encode': {
+            'queue': 'video',
+            'exchange': 'media'
+            'routing_key': 'media.video.encode',
         },
     }
 
-    task_routes = ("myapp.tasks.Router", {"celery.ping": "default})
+    task_routes = ('myapp.tasks.Router', {'celery.ping': 'default})
 
 Where ``myapp.tasks.Router`` could be:
 
@@ -1334,8 +1339,8 @@ Where ``myapp.tasks.Router`` could be:
     class Router(object):
 
         def route_for_task(self, task, args=None, kwargs=None):
-            if task == "celery.ping":
-                return "default"
+            if task == 'celery.ping':
+                return {'queue': 'default'}
 
 ``route_for_task`` may return a string or a dict. A string then means
 it's a queue name in :setting:`task_queues`, a dict means it's a custom route.
@@ -1349,20 +1354,20 @@ Example if :func:`~celery.execute.apply_async` has these arguments:
 
 .. code-block:: python
 
-   Task.apply_async(immediate=False, exchange="video",
-                    routing_key="video.compress")
+   Task.apply_async(immediate=False, exchange='video',
+                    routing_key='video.compress')
 
 and a router returns:
 
 .. code-block:: python
 
-    {"immediate": True, "exchange": "urgent"}
+    {'immediate': True, 'exchange': 'urgent'}
 
 the final message options will be:
 
 .. code-block:: python
 
-    immediate=True, exchange="urgent", routing_key="video.compress"
+    immediate=True, exchange='urgent', routing_key='video.compress'
 
 (and any default message options defined in the
 :class:`~celery.task.base.Task` class)
@@ -1375,17 +1380,17 @@ With the follow settings:
 .. code-block:: python
 
     task_queues = {
-        "cpubound": {
-            "exchange": "cpubound",
-            "routing_key": "cpubound",
+        'cpubound': {
+            'exchange': 'cpubound',
+            'routing_key': 'cpubound',
         },
     }
 
     task_routes = {
-        "tasks.add": {
-            "queue": "cpubound",
-            "routing_key": "tasks.add",
-            "serializer": "json",
+        'tasks.add': {
+            'queue': 'cpubound',
+            'routing_key': 'tasks.add',
+            'serializer': 'json',
         },
     }
 
@@ -1393,9 +1398,9 @@ The final routing options for ``tasks.add`` will become:
 
 .. code-block:: javascript
 
-    {"exchange": "cpubound",
-     "routing_key": "tasks.add",
-     "serializer": "json"}
+    {'exchange': 'cpubound',
+     'routing_key': 'tasks.add',
+     'serializer': 'json'}
 
 See :ref:`routers` for more examples.
 
@@ -1970,7 +1975,7 @@ email_charset
 ~~~~~~~~~~~~~
 .. versionadded:: 4.0
 
-Charset for outgoing emails. Default is "utf-8".
+Charset for outgoing emails. Default is 'utf-8'.
 
 .. _conf-example-error-mail-config:
 

+ 22 - 1
docs/userguide/routing.rst

@@ -41,7 +41,28 @@ With this route enabled import feed tasks will be routed to the
 `"feeds"` queue, while all other tasks will be routed to the default queue
 (named `"celery"` for historical reasons).
 
-Now you can start server `z` to only process the feeds queue like this:
+Alternatively, you can use glob pattern matching, or even regular expressions,
+to match all tasks in the ``feed.tasks`` namespace::
+
+    task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
+
+If the order in which the patterns are matched is important you should should
+specify a tuple as the task router instead::
+
+    task_routes = ([
+        ('feed.tasks.*': {'queue': 'feeds'}),
+        ('web.tasks.*': {'queue': 'web'}),
+        (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
+    ],)
+
+.. note::
+
+    The :setting:`task_routes` setting can either be a dictionary, or a
+    list of router objects, so in this case we need to specify the setting
+    as a tuple containing a list.
+
+After installing the router, you can start server `z` to only process the feeds
+queue like this:
 
 .. code-block:: console
 

+ 6 - 0
docs/whatsnew-4.0.rst

@@ -382,6 +382,12 @@ Task Autoretry Decorator
 
 Contributed by Dmitry Malinovsky.
 
+
+:setting:`task_routes` can now contain glob patterns and regexes.
+=================================================================
+
+See examples in :setting:`task_routes` and :ref:`routing-automatic`.
+
 In Other News
 -------------