Ver código fonte

Routers can now be functions taking (name, args, kwargs, options, task=None)

Ask Solem 8 anos atrás
pai
commit
55275d5a39

+ 3 - 2
celery/app/base.py

@@ -629,7 +629,7 @@ class Celery(object):
                   add_to_parent=True, group_id=None, retries=0, chord=None,
                   add_to_parent=True, group_id=None, retries=0, chord=None,
                   reply_to=None, time_limit=None, soft_time_limit=None,
                   reply_to=None, time_limit=None, soft_time_limit=None,
                   root_id=None, parent_id=None, route_name=None,
                   root_id=None, parent_id=None, route_name=None,
-                  shadow=None, chain=None, **options):
+                  shadow=None, chain=None, task_type=None, **options):
         """Send task by name.
         """Send task by name.
 
 
         :param name: Name of task to call (e.g. `"tasks.add"`).
         :param name: Name of task to call (e.g. `"tasks.add"`).
@@ -649,7 +649,8 @@ class Celery(object):
             warnings.warn(AlwaysEagerIgnored(
             warnings.warn(AlwaysEagerIgnored(
                 'task_always_eager has no effect on send_task',
                 'task_always_eager has no effect on send_task',
             ), stacklevel=2)
             ), stacklevel=2)
-        options = router.route(options, route_name or name, args, kwargs)
+        options = router.route(
+            options, route_name or name, args, kwargs, task_type)
 
 
         if root_id is None:
         if root_id is None:
             parent, have_parent = self.current_worker_task, True
             parent, have_parent = self.current_worker_task, True

+ 31 - 19
celery/app/routes.py

@@ -19,20 +19,12 @@ from kombu import Queue
 from celery.exceptions import QueueNotFound
 from celery.exceptions import QueueNotFound
 from celery.five import items, string_t
 from celery.five import items, string_t
 from celery.utils.collections import lpmerge
 from celery.utils.collections import lpmerge
-from celery.utils.functional import firstmethod, fun_takes_argument, mlazy
-from celery.utils.imports import instantiate
+from celery.utils.functional import maybe_evaluate, mlazy
+from celery.utils.imports import symbol_by_name
 
 
 __all__ = ['MapRoute', 'Router', 'prepare']
 __all__ = ['MapRoute', 'Router', 'prepare']
 
 
 
 
-def _try_route(meth, task, args, kwargs, options=None):
-    if fun_takes_argument('options', meth, position=4):
-        return meth(task, args, kwargs, options)
-    return meth(task, args, kwargs)
-
-_first_route = firstmethod('route_for_task')
-
-
 def glob_to_re(glob, quote=string.punctuation.replace('*', '')):
 def glob_to_re(glob, quote=string.punctuation.replace('*', '')):
     glob = ''.join('\\' + c if c in quote else c for c in glob)
     glob = ''.join('\\' + c if c in quote else c for c in glob)
     return glob.replace('*', '.+?')
     return glob.replace('*', '.+?')
@@ -53,15 +45,15 @@ class MapRoute(object):
             else:
             else:
                 self.map[k] = v
                 self.map[k] = v
 
 
-    def route_for_task(self, task, *args, **kwargs):
+    def __call__(self, name, *args, **kwargs):
         try:
         try:
-            return dict(self.map[task])
+            return dict(self.map[name])
         except KeyError:
         except KeyError:
             pass
             pass
         except ValueError:
         except ValueError:
-            return {'queue': self.map[task]}
+            return {'queue': self.map[name]}
         for regex, route in items(self.patterns):
         for regex, route in items(self.patterns):
-            if regex.match(task):
+            if regex.match(name):
                 try:
                 try:
                     return dict(route)
                     return dict(route)
                 except ValueError:
                 except ValueError:
@@ -77,10 +69,10 @@ class Router(object):
         self.routes = [] if routes is None else routes
         self.routes = [] if routes is None else routes
         self.create_missing = create_missing
         self.create_missing = create_missing
 
 
-    def route(self, options, task, args=(), kwargs={}):
+    def route(self, options, name, args=(), kwargs={}, task_type=None):
         options = self.expand_destination(options)  # expands 'queue'
         options = self.expand_destination(options)  # expands 'queue'
         if self.routes:
         if self.routes:
-            route = self.lookup_route(task, args, kwargs, options)
+            route = self.lookup_route(name, args, kwargs, options, task_type)
             if route:  # expands 'queue' in route.
             if route:  # expands 'queue' in route.
                 return lpmerge(self.expand_destination(route), options)
                 return lpmerge(self.expand_destination(route), options)
         if 'queue' not in options:
         if 'queue' not in options:
@@ -108,8 +100,28 @@ class Router(object):
                         'Queue {0!r} missing from task_queues'.format(queue))
                         'Queue {0!r} missing from task_queues'.format(queue))
         return route
         return route
 
 
-    def lookup_route(self, task, args=None, kwargs=None, options=None):
-        return _first_route(self.routes, task, args, kwargs, options)
+    def lookup_route(self, name,
+                     args=None, kwargs=None, options=None, task_type=None):
+        query = self.query_router
+        for router in self.routes:
+            route = query(router, name, args, kwargs, options, task_type)
+            if route is not None:
+                return route
+
+    def query_router(self, router, task, args, kwargs, options, task_type):
+        router = maybe_evaluate(router)
+        if hasattr(router, 'route_for_task'):
+            # pre 4.0 router class
+            return router.route_for_task(task, args, kwargs)
+        return router(task, args, kwargs, options, task=task_type)
+
+
+def expand_router_string(router):
+    router = symbol_by_name(router)
+    if hasattr(router, 'route_for_task'):
+        # need to instantiate pre 4.0 router classes
+        router = router()
+    return router
 
 
 
 
 def prepare(routes):
 def prepare(routes):
@@ -119,7 +131,7 @@ def prepare(routes):
         if isinstance(route, (Mapping, list, tuple)):
         if isinstance(route, (Mapping, list, tuple)):
             return MapRoute(route)
             return MapRoute(route)
         if isinstance(route, string_t):
         if isinstance(route, string_t):
-            return mlazy(instantiate, route)
+            return mlazy(expand_router_string, route)
         return route
         return route
 
 
     if routes is None:
     if routes is None:

+ 1 - 1
celery/app/task.py

@@ -496,7 +496,7 @@ class Task(object):
         return app.send_task(
         return app.send_task(
             self.name, args, kwargs, task_id=task_id, producer=producer,
             self.name, args, kwargs, task_id=task_id, producer=producer,
             link=link, link_error=link_error, result_cls=self.AsyncResult,
             link=link, link_error=link_error, result_cls=self.AsyncResult,
-            shadow=shadow,
+            shadow=shadow, task_type=self,
             **options
             **options
         )
         )
 
 

+ 79 - 33
celery/tests/app/test_routes.py

@@ -5,9 +5,9 @@ from kombu.utils.functional import maybe_evaluate
 
 
 from celery.app import routes
 from celery.app import routes
 from celery.exceptions import QueueNotFound
 from celery.exceptions import QueueNotFound
-from celery.utils.functional import LRUCache
+from celery.utils.imports import qualname
 
 
-from celery.tests.case import AppCase
+from celery.tests.case import ANY, AppCase, Mock
 
 
 
 
 def Router(app, *args, **kwargs):
 def Router(app, *args, **kwargs):
@@ -45,10 +45,21 @@ class RouteCase(AppCase):
         }
         }
 
 
         @self.app.task(shared=False)
         @self.app.task(shared=False)
-        def mytask():
+        def mytask(*args, **kwargs):
             pass
             pass
         self.mytask = mytask
         self.mytask = mytask
 
 
+    def assert_routes_to_queue(self, queue, router, name,
+                               args=[], kwargs={}, options={}):
+        self.assertEqual(
+            router.route(options, name, args, kwargs)['queue'].name,
+            queue,
+        )
+
+    def assert_routes_to_default_queue(self, router, name, *args, **kwargs):
+        self.assert_routes_to_queue(
+            self.app.conf.task_default_queue, router, name, *args, **kwargs)
+
 
 
 class test_MapRoute(RouteCase):
 class test_MapRoute(RouteCase):
 
 
@@ -57,10 +68,10 @@ class test_MapRoute(RouteCase):
         expand = E(self.app, self.app.amqp.queues)
         expand = E(self.app, self.app.amqp.queues)
         route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
         route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
         self.assertEqual(
         self.assertEqual(
-            expand(route.route_for_task(self.mytask.name))['queue'].name,
+            expand(route(self.mytask.name))['queue'].name,
             'foo',
             'foo',
         )
         )
-        self.assertIsNone(route.route_for_task('celery.awesome'))
+        self.assertIsNone(route('celery.awesome'))
 
 
     def test_route_for_task(self):
     def test_route_for_task(self):
         set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
         set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
@@ -68,33 +79,27 @@ class test_MapRoute(RouteCase):
         route = routes.MapRoute({self.mytask.name: self.b_queue})
         route = routes.MapRoute({self.mytask.name: self.b_queue})
         self.assertDictContainsSubset(
         self.assertDictContainsSubset(
             self.b_queue,
             self.b_queue,
-            expand(route.route_for_task(self.mytask.name)),
+            expand(route(self.mytask.name)),
         )
         )
-        self.assertIsNone(route.route_for_task('celery.awesome'))
+        self.assertIsNone(route('celery.awesome'))
 
 
     def test_route_for_task__glob(self):
     def test_route_for_task__glob(self):
         route = routes.MapRoute([
         route = routes.MapRoute([
             ('proj.tasks.*', 'routeA'),
             ('proj.tasks.*', 'routeA'),
             ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
             ('demoapp.tasks.bar.*', {'exchange': 'routeB'}),
         ])
         ])
-        self.assertDictEqual(
-            route.route_for_task('proj.tasks.foo'),
-            {'queue': 'routeA'},
-        )
-        self.assertDictEqual(
-            route.route_for_task('demoapp.tasks.bar.moo'),
-            {'exchange': 'routeB'},
-        )
-        self.assertIsNone(
-            route.route_for_task('demoapp.foo.bar.moo'),
-        )
+        self.assertDictEqual(route('proj.tasks.foo'), {'queue': 'routeA'})
+        self.assertDictEqual(route('demoapp.tasks.bar.moo'), {
+            'exchange': 'routeB',
+        })
+        self.assertIsNone(route('demoapp.foo.bar.moo'))
 
 
     def test_expand_route_not_found(self):
     def test_expand_route_not_found(self):
         expand = E(self.app, self.app.amqp.Queues(
         expand = E(self.app, self.app.amqp.Queues(
                    self.app.conf.task_queues, False))
                    self.app.conf.task_queues, False))
         route = routes.MapRoute({'a': {'queue': 'x'}})
         route = routes.MapRoute({'a': {'queue': 'x'}})
         with self.assertRaises(QueueNotFound):
         with self.assertRaises(QueueNotFound):
-            expand(route.route_for_task('a'))
+            expand(route('a'))
 
 
 
 
 class test_lookup_route(RouteCase):
 class test_lookup_route(RouteCase):
@@ -108,8 +113,7 @@ class test_lookup_route(RouteCase):
         R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
         R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
                             {self.mytask.name: {'queue': 'foo'}}))
                             {self.mytask.name: {'queue': 'foo'}}))
         router = Router(self.app, R, self.app.amqp.queues)
         router = Router(self.app, R, self.app.amqp.queues)
-        self.assertEqual(router.route({}, self.mytask.name,
-                         args=[1, 2], kwargs={})['queue'].name, 'bar')
+        self.assert_routes_to_queue('bar', router, self.mytask.name)
 
 
     def test_expands_queue_in_options(self):
     def test_expands_queue_in_options(self):
         set_queues(self.app)
         set_queues(self.app)
@@ -145,32 +149,74 @@ class test_lookup_route(RouteCase):
         self.assertIs(dest['queue'], queue)
         self.assertIs(dest['queue'], queue)
 
 
     def test_lookup_paths_traversed(self):
     def test_lookup_paths_traversed(self):
-        set_queues(
-            self.app, foo=self.a_queue, bar=self.b_queue,
-            **{self.app.conf.task_default_queue: self.d_queue}
-        )
+        self.simple_queue_setup()
         R = routes.prepare((
         R = routes.prepare((
             {'celery.xaza': {'queue': 'bar'}},
             {'celery.xaza': {'queue': 'bar'}},
             {self.mytask.name: {'queue': 'foo'}}
             {self.mytask.name: {'queue': 'foo'}}
         ))
         ))
         router = Router(self.app, R, self.app.amqp.queues)
         router = Router(self.app, R, self.app.amqp.queues)
-        self.assertEqual(router.route({}, self.mytask.name,
-                         args=[1, 2], kwargs={})['queue'].name, 'foo')
-        self.assertEqual(
-            router.route({}, 'celery.poza')['queue'].name,
-            self.app.conf.task_default_queue,
+        self.assert_routes_to_queue('foo', router, self.mytask.name)
+        self.assert_routes_to_default_queue(router, 'celery.poza')
+
+    def test_compat_router_class(self):
+        self.simple_queue_setup()
+        R = routes.prepare((
+            TestRouter(),
+        ))
+        router = Router(self.app, R, self.app.amqp.queues)
+        self.assert_routes_to_queue('bar', router, 'celery.xaza')
+        self.assert_routes_to_default_queue(router, 'celery.poza')
+
+    def test_router_fun__called_with(self):
+        self.simple_queue_setup()
+        step = Mock(spec=['__call__'])
+        step.return_value = None
+        R = routes.prepare([step])
+        router = Router(self.app, R, self.app.amqp.queues)
+        self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
+        step.assert_called_with(
+            self.mytask.name, (2, 2), {'kw': 3}, ANY,
+            task=self.mytask,
+        )
+        options = step.call_args[0][3]
+        self.assertEqual(options['priority'], 3)
+
+    def test_compat_router_classes__called_with(self):
+        self.simple_queue_setup()
+        step = Mock(spec=['route_for_task'])
+        step.route_for_task.return_value = None
+        R = routes.prepare([step])
+        router = Router(self.app, R, self.app.amqp.queues)
+        self.mytask.apply_async((2, 2), {'kw': 3}, router=router, priority=3)
+        step.route_for_task.assert_called_with(
+            self.mytask.name, (2, 2), {'kw': 3},
         )
         )
 
 
+    def simple_queue_setup(self):
+        set_queues(
+            self.app, foo=self.a_queue, bar=self.b_queue,
+            **{self.app.conf.task_default_queue: self.d_queue})
+
+
+class TestRouter(object):
+
+    def route_for_task(self, task, args, kwargs):
+        if task == 'celery.xaza':
+            return 'bar'
+
 
 
 class test_prepare(AppCase):
 class test_prepare(AppCase):
 
 
     def test_prepare(self):
     def test_prepare(self):
         o = object()
         o = object()
-        R = [{'foo': 'bar'},
-             'celery.utils.functional.LRUCache', o]
+        R = [
+            {'foo': 'bar'},
+            qualname(TestRouter),
+            o,
+        ]
         p = routes.prepare(R)
         p = routes.prepare(R)
         self.assertIsInstance(p[0], routes.MapRoute)
         self.assertIsInstance(p[0], routes.MapRoute)
-        self.assertIsInstance(maybe_evaluate(p[1]), LRUCache)
+        self.assertIsInstance(maybe_evaluate(p[1]), TestRouter)
         self.assertIs(p[2], o)
         self.assertIs(p[2], o)
 
 
         self.assertEqual(routes.prepare(o), [o])
         self.assertEqual(routes.prepare(o), [o])

+ 7 - 8
docs/configuration.rst

@@ -1251,8 +1251,9 @@ in order.
 
 
 A router can be specified as either:
 A router can be specified as either:
 
 
-*  A router class instance.
-*  A string which provides the path to a router class
+*  A function with the signature ``(name, args, kwargs,
+   options, task=None, **kwargs)``
+*  A string which provides the path to a router function.
 *  A dict containing router specification:
 *  A dict containing router specification:
      Will be converted to a :class:`celery.routes.MapRoute` instance.
      Will be converted to a :class:`celery.routes.MapRoute` instance.
 * A list of ``(pattern, route)`` tuples:
 * A list of ``(pattern, route)`` tuples:
@@ -1274,19 +1275,17 @@ Examples:
         },
         },
     }
     }
 
 
-    task_routes = ('myapp.tasks.Router', {'celery.ping': 'default})
+    task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})
 
 
-Where ``myapp.tasks.Router`` could be:
+Where ``myapp.tasks.route_task`` could be:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    class Router(object):
-
-        def route_for_task(self, task, args=None, kwargs=None):
+    def route_task(self, name, args, kwargs, options, task=None, **kwargs):
             if task == 'celery.ping':
             if task == 'celery.ping':
                 return {'queue': 'default'}
                 return {'queue': 'default'}
 
 
-``route_for_task`` may return a string or a dict. A string then means
+``route_task`` may return a string or a dict. A string then means
 it's a queue name in :setting:`task_queues`, a dict means it's a custom route.
 it's a queue name in :setting:`task_queues`, a dict means it's a custom route.
 
 
 When sending tasks, the routers are consulted in order. The first
 When sending tasks, the routers are consulted in order. The first

+ 28 - 15
docs/userguide/routing.rst

@@ -573,21 +573,18 @@ as task attributes.
 Routers
 Routers
 -------
 -------
 
 
-A router is a class that decides the routing options for a task.
+A router is a function that decides the routing options for a task.
 
 
-All you need to define a new router is to create a class with a
-``route_for_task`` method:
+All you need to define a new router is to define a function with
+the signature ``(name, args, kwargs, options, task=None, **kwargs)``:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    class MyRouter(object):
-
-        def route_for_task(self, task, args=None, kwargs=None):
-            if task == 'myapp.tasks.compress_video':
+    def route_task(name, args, kwargs, options, task=None, **kwargs):
+            if name == 'myapp.tasks.compress_video':
                 return {'exchange': 'video',
                 return {'exchange': 'video',
                         'exchange_type': 'topic',
                         'exchange_type': 'topic',
                         'routing_key': 'video.compress'}
                         'routing_key': 'video.compress'}
-            return None
 
 
 If you return the ``queue`` key, it will expand with the defined settings of
 If you return the ``queue`` key, it will expand with the defined settings of
 that queue in :setting:`task_queues`:
 that queue in :setting:`task_queues`:
@@ -611,13 +608,13 @@ setting:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    task_routes = (MyRouter(),)
+    task_routes = (route_task,)
 
 
-Router classes can also be added by name:
+Router functions can also be added by name:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    task_routes = ('myapp.routers.MyRouter',)
+    task_routes = ('myapp.routers.route_task',)
 
 
 
 
 For simple task name -> route mappings like the router example above,
 For simple task name -> route mappings like the router example above,
@@ -626,16 +623,32 @@ same behavior:
 
 
 .. code-block:: python
 .. code-block:: python
 
 
-    task_routes = (
-        {'myapp.tasks.compress_video': {
+    task_routes = {
+        'myapp.tasks.compress_video': {
             'queue': 'video',
             'queue': 'video',
             'routing_key': 'video.compress',
             'routing_key': 'video.compress',
-        }},
-    )
+        },
+    }
 
 
 The routers will then be traversed in order, it will stop at the first router
 The routers will then be traversed in order, it will stop at the first router
 returning a true value, and use that as the final route for the task.
 returning a true value, and use that as the final route for the task.
 
 
+You can also have multiple routers defined in a sequence:
+
+.. code-block:: python
+
+    task_routes = [
+        route_task,
+        {
+            'myapp.tasks.compress_video': {
+                'queue': 'video',
+                'routing_key': 'video.compress',
+        },
+    ]
+
+The routers will then be visited in turn, and the first to return
+a value will be chosen.
+
 Broadcast
 Broadcast
 ---------
 ---------
 
 

+ 38 - 5
docs/whatsnew-4.0.rst

@@ -770,6 +770,7 @@ will "accumulate" the results of the group tasks.
 A new built-in task (`celery.accumulate` was added for this purpose)
 A new built-in task (`celery.accumulate` was added for this purpose)
 
 
 Closes #817
 Closes #817
+
 Optimized Beat implementation
 Optimized Beat implementation
 =============================
 =============================
 
 
@@ -811,6 +812,43 @@ eventlet/gevent drainers, promises, BLA BLA
 
 
 Closed issue #2529.
 Closed issue #2529.
 
 
+New Task Router API
+===================
+
+The :setting:`task_routes` setting can now hold functions, and map routes
+now support glob patterns and regexes.
+
+Instead of using router classes you can now simply define a function:
+
+.. code-block:: python
+
+    def route_for_task(name, args, kwargs, options, task=None, **kwargs):
+        from proj import tasks
+
+        if name == tasks.add.name:
+            return {'queue': 'hipri'}
+
+If you don't need the arguments you can use start arguments, just make
+sure you always also accept star arguments so that we have the ability
+to add more features in the future:
+
+.. code-block:: python
+
+    def route_for_task(name, *args, **kwargs):
+        from proj import tasks
+        if name == tasks.add.name:
+            return {'queue': 'hipri', 'priority': 9}
+
+Both the ``options`` argument and the new ``task`` keyword argument
+are new to the function-style routers, and will make it easier to write
+routers based on execution options, or properties of the task.
+
+The optional ``task`` keyword argument will not be set if a task is called
+by name using :meth:`@send_task`.
+
+For more examples, including using glob/regexes in routers please see
+:setting:`task_routes` and :ref:`routing-automatic`.
+
 In Other News
 In Other News
 -------------
 -------------
 
 
@@ -827,11 +865,6 @@ In Other News
 
 
   This increases performance as it completely bypasses the routing table.
   This increases performance as it completely bypasses the routing table.
 
 
-- **Tasks**: :setting:`task_routes` can now contain glob patterns and
-  regexes.
-
-    See new examples in :setting:`task_routes` and :ref:`routing-automatic`.
-
 - **Eventlet/Gevent**: Fixed race condition leading to "simultaneous read"
 - **Eventlet/Gevent**: Fixed race condition leading to "simultaneous read"
   errors (Issue #2812).
   errors (Issue #2812).