Forráskód Böngészése

Adds CELERY_ANNOTATIONS; Like CELERY_ROUTES but lets you change task attributes

Ask Solem 13 éve
szülő
commit
a0bd6d428a
4 módosított fájl, 75 hozzáadás és 0 törlés
  1. 14 0
      celery/app/__init__.py
  2. 1 0
      celery/app/defaults.py
  3. 3 0
      celery/app/task/__init__.py
  4. 57 0
      docs/configuration.rst

+ 14 - 0
celery/app/__init__.py

@@ -21,6 +21,7 @@ from inspect import getargspec
 from .. import registry
 from ..utils import cached_property, instantiate
 
+from . import annotations
 from . import base
 
 # Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
@@ -191,11 +192,24 @@ class App(base.BaseApp):
             return inner_create_task_cls(**options)(*args)
         return inner_create_task_cls(**options)
 
+    def annotate_task(self, task):
+        if self.annotations:
+            match = annotations._first_match(self.annotations, task)
+            for attr, value in (match or {}).iteritems():
+                setattr(task, attr, value)
+            match_any = annotations._first_match_any(self.annotations)
+            for attr, value in (match_any or {}).iteritems():
+                setattr(task, attr, value)
+
     @cached_property
     def Task(self):
         """Default Task base class for this application."""
         return self.create_task_cls()
 
+    @cached_property
+    def annotations(self):
+        return annotations.prepare(self.conf.CELERY_ANNOTATIONS)
+
     def __repr__(self):
         return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
 

+ 1 - 0
celery/app/defaults.py

@@ -88,6 +88,7 @@ NAMESPACES = {
                 alt="CELERY_TASK_RESULT_EXPIRES"),
         "AMQP_TASK_RESULT_CONNECTION_MAX": Option(1, type="int",
             remove_by="2.5", alt="BROKER_POOL_LIMIT"),
+        "ANNOTATIONS": Option(type="any"),
         "BROADCAST_QUEUE": Option("celeryctl"),
         "BROADCAST_EXCHANGE": Option("celeryctl"),
         "BROADCAST_EXCHANGE_TYPE": Option("fanout"),

+ 3 - 0
celery/app/task/__init__.py

@@ -99,6 +99,9 @@ class TaskType(type):
                 task_name = task_cls.name = '.'.join([task_cls.app.main, name])
             tasks.register(task_cls)
         task = tasks[task_name].__class__
+
+        # decorate with annotations from config.
+        task.app.annotate_task(task)
         return task
 
     def __repr__(cls):

+ 57 - 0
docs/configuration.rst

@@ -40,10 +40,67 @@ It should contain all you need to run a basic Celery set-up.
     ## available will be used.
     CELERYD_CONCURRENCY = 10
 
+    CELERY_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}
+
 
 Configuration Directives
 ========================
 
+.. _conf-tasks:
+
+Task settings
+-------------
+
+.. setting:: CELERY_ANNOTATIONS
+
+CELERY_ANNOTATIONS
+~~~~~~~~~~~~~~~~~~
+
+This setting can be used to rewrite any task attribute from the
+configuration.  The setting can be a dict, or a list of annotation
+objects that filter for tasks and return a map of attributes
+to change.
+
+
+This will change the ``rate_limit`` attribute for the ``tasks.add``
+task:
+
+.. code-block:: python
+
+    CELERY_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}
+
+or change the same for all tasks:
+
+.. code-block:: python
+
+    CELERY_ANNOTATIONS = {"*": {"rate_limit": "10/s"}}
+
+
+You can change methods too, for example the ``on_failure`` handler:
+
+.. code-block:: python
+
+    def my_on_failure(self, exc, task_id, args, kwargs, einfo):
+        print("Oh no! Task failed: %r" % (exc, ))
+
+    CELERY_ANNOTATIONS = {"*": {"on_failure": my_on_failure}}
+
+
+If you need more flexibility then you can use objects
+instead of a dict to choose which tasks to annotate:
+
+.. code-block:: python
+
+    class MyAnnotate(object):
+
+        def annotate(self, task):
+            if task.name.startswith("tasks."):
+                return {"rate_limit": "10/s"}
+
+    CELERY_ANNOTATIONS = (MyAnnotate(), {...})
+
+
+
 .. _conf-concurrency:
 
 Concurrency settings