ソースを参照

Annotations now supports decorators if the key startswith '@'.

E.g.::

    def debug_args(fun):

        @wraps(fun)
        def _inner(*args, **kwargs):
            print("ARGS: %r" % (args, ))
            return fun(*args, **kwargs)

        return _inner

    CELERY_ANNOTATIONS = {
        "tasks.add": {"@__call__": debug_args},
    }

Also tasks are now always bound by class so that
annotated methods end up being bound.
Ask Solem 13 年 前
コミット
e3418c22a2
3 ファイル変更35 行追加17 行削除
  1. 4 1
      celery/app/base.py
  2. 29 12
      celery/app/task.py
  3. 2 4
      celery/task/base.py

+ 4 - 1
celery/app/base.py

@@ -147,12 +147,15 @@ class Celery(object):
 
     def finalize(self):
         if not self.finalized:
+            self.finalized = True
             load_shared_tasks(self)
 
             pending = self._pending
             while pending:
                 maybe_evaluate(pending.pop())
-            self.finalized = True
+
+            for task in self._tasks.itervalues():
+                task.bind(self)
 
     def config_from_object(self, obj, silent=False):
         del(self.conf)

+ 29 - 12
celery/app/task.py

@@ -21,6 +21,7 @@ from kombu.utils import cached_property
 
 from celery import current_app
 from celery import states
+from celery.__compat__ import class_property
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.result import EagerResult
@@ -304,8 +305,9 @@ class BaseTask(object):
     # - Tasks are lazily bound, so that configuration is not set
     # - until the task is actually used
 
+    @classmethod
     def bind(self, app):
-        self.__bound__ = True
+        was_bound, self.__bound__ = self.__bound__, True
         self._app = app
         conf = app.conf
 
@@ -319,25 +321,46 @@ class BaseTask(object):
             self.backend = app.backend
 
         # decorate with annotations from config.
-        self.annotate()
+        if not was_bound:
+            self.annotate()
 
         # PeriodicTask uses this to add itself to the PeriodicTask schedule.
         self.on_bound(app)
 
         return app
 
+    @classmethod
     def on_bound(self, app):
         """This method can be defined to do additional actions when the
         task class is bound to an app."""
         pass
 
+    @classmethod
     def _get_app(self):
         if not self.__bound__ or self._app is None:
             # The app property's __set__  method is not called
             # if Task.app is set (on the class), so must bind on use.
             self.bind(current_app)
         return self._app
-    app = property(_get_app, bind)
+    app = class_property(_get_app, bind)
+
+    @classmethod
+    def annotate(self):
+        for d in resolve_all_annotations(self.app.annotations, self):
+            for key, value in d.iteritems():
+                if key.startswith('@'):
+                    self.add_around(key[1:], value)
+                else:
+                    setattr(self, key, value)
+
+    @classmethod
+    def add_around(self, attr, around):
+        orig = getattr(self, attr)
+        if getattr(orig, "__wrapped__", None):
+            orig = orig.__wrapped__
+        meth = around(orig)
+        meth.__wrapped__ = orig
+        setattr(self, attr, meth)
 
     def __call__(self, *args, **kwargs):
         return self.run(*args, **kwargs)
@@ -682,13 +705,12 @@ class BaseTask(object):
                                         if key in supported_keys)
             kwargs.update(extend_with)
 
+        tb = None
         retval, info = eager_trace_task(task, task_id, args, kwargs,
                                         request=request, propagate=throw)
         if isinstance(retval, ExceptionInfo):
-            retval = retval.exception
-        state, tb = states.SUCCESS, ''
-        if info is not None:
-            state, tb = info.state, info.strtb
+            retval, tb = retval.exception, retval.traceback
+        state = states.SUCCESS if info is None else info.state
         return EagerResult(task_id, retval, state, traceback=tb)
 
     def AsyncResult(self, task_id):
@@ -823,11 +845,6 @@ class BaseTask(object):
         """
         request.execute_using_pool(pool, loglevel, logfile)
 
-    def annotate(self):
-        for d in resolve_all_annotations(self.app.annotations, self):
-            for key, value in d.iteritems():
-                setattr(self, key, value)
-
     def __repr__(self):
         """`repr(task)`"""
         return "<@task: %s>" % (self.name, )

+ 2 - 4
celery/task/base.py

@@ -14,15 +14,14 @@
 from __future__ import absolute_import
 
 from celery import current_app
-from celery.__compat__ import class_property, reclassmethod
+from celery.__compat__ import reclassmethod
 from celery.app.task import Context, TaskType, BaseTask  # noqa
 from celery.schedules import maybe_schedule
 
 #: list of methods that must be classmethods in the old API.
 _COMPAT_CLASSMETHODS = (
     "get_logger", "establish_connection", "get_publisher", "get_consumer",
-    "delay", "apply_async", "retry", "apply", "AsyncResult", "subtask",
-    "bind", "on_bound", "_get_app", "annotate")
+    "delay", "apply_async", "retry", "apply", "AsyncResult", "subtask")
 
 
 class Task(BaseTask):
@@ -36,7 +35,6 @@ class Task(BaseTask):
     # given us pain (like all magic).
     for name in _COMPAT_CLASSMETHODS:
         locals()[name] = reclassmethod(getattr(BaseTask, name))
-    app = class_property(_get_app, bind)  # noqa
 
 
 class PeriodicTask(Task):