Browse Source

The @task decorator is now lazy when used with custom apps

If ``accept_magic_kwargs`` is enabled (herby called "compat mode"), the task
decorator executes inline like before, however for custom apps the @task
decorator now returns a special PromiseProxy object that is only evaluated
on access.

All promises will be evaluated when `app.finalize` is called, or implicitly
when the task registry is first used.
Ask Solem 13 years ago
parent
commit
5ff76f3a48
4 changed files with 82 additions and 12 deletions
  1. 28 8
      celery/app/__init__.py
  2. 13 0
      celery/app/base.py
  3. 40 3
      celery/local.py
  4. 1 1
      celery/tests/test_task/__init__.py

+ 28 - 8
celery/app/__init__.py

@@ -15,6 +15,7 @@ from __future__ import absolute_import
 import os
 import threading
 
+from ..local import PromiseProxy
 from ..utils import cached_property, instantiate
 
 from . import annotations
@@ -155,20 +156,28 @@ class App(base.BaseApp):
             >>> refresh_feed.delay("http://example.com/rss") # Async
             <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
 
+        .. admonition:: App Binding
+
+            For custom apps the task decorator returns proxy
+            objects, so that the act of creating the task is not performed
+            until the task is used or the task registry is accessed.
+
+            If you are depending on binding to be deferred, then you must
+            not access any attributes on the returned object until the
+            application is fully set up (finalized).
+
         """
 
         def inner_create_task_cls(**options):
 
             def _create_task_cls(fun):
-                base = options.pop("base", None) or self.Task
+                if self.accept_magic_kwargs:  # compat mode
+                    return self._task_from_fun(fun, **options)
 
-                T = type(fun.__name__, (base, ), dict({
-                        "app": self,
-                        "accept_magic_kwargs": False,
-                        "run": staticmethod(fun),
-                        "__doc__": fun.__doc__,
-                        "__module__": fun.__module__}, **options))()
-                return self._tasks[T.name]  # global instance.
+                # return a proxy object that is only evaluated when first used
+                promise = PromiseProxy(self._task_from_fun, (fun, ), options)
+                self._pending.append(promise)
+                return promise
 
             return _create_task_cls
 
@@ -176,6 +185,17 @@ class App(base.BaseApp):
             return inner_create_task_cls(**options)(*args)
         return inner_create_task_cls(**options)
 
+    def _task_from_fun(self, fun, **options):
+        base = options.pop("base", None) or self.Task
+
+        T = type(fun.__name__, (base, ), dict({
+                "app": self,
+                "accept_magic_kwargs": False,
+                "run": staticmethod(fun),
+                "__doc__": fun.__doc__,
+                "__module__": fun.__module__}, **options))()
+        return self._tasks[T.name]  # return global instance.
+
     def annotate_task(self, task):
         if self.annotations:
             match = annotations._first_match(self.annotations, task)

+ 13 - 0
celery/app/base.py

@@ -16,6 +16,7 @@ import os
 import warnings
 import platform as _platform
 
+from collections import deque
 from contextlib import contextmanager
 from copy import deepcopy
 from functools import wraps
@@ -25,6 +26,7 @@ from kombu.clocks import LamportClock
 from .. import datastructures
 from .. import platforms
 from ..exceptions import AlwaysEagerIgnored
+from ..local import maybe_evaluate
 from ..utils import cached_property, instantiate, lpmerge
 
 from .defaults import DEFAULTS, find_deprecated_settings, find
@@ -110,12 +112,22 @@ class BaseApp(object):
         self.registry_cls = self.registry_cls if tasks is None else tasks
         self._tasks = instantiate(self.registry_cls)
 
+        self._pending = deque()
+        self.finalized = False
+
         self.on_init()
 
     def on_init(self):
         """Called at the end of the constructor."""
         pass
 
+    def finalize(self):
+        if not self.finalized:
+            pending = self._pending
+            while pending:
+                maybe_evaluate(pending.pop())
+            self.finalized = True
+
     def config_from_object(self, obj, silent=False):
         """Read configuration from object, where object is either
         a object, or the name of a module to import.
@@ -394,4 +406,5 @@ class BaseApp(object):
     def tasks(self):
         from .task.builtins import load_builtins
         load_builtins(self)
+        self.finalize()
         return self._tasks

+ 40 - 3
celery/local.py

@@ -28,10 +28,12 @@ class Proxy(object):
     """Proxy to another object."""
 
     # Code stolen from werkzeug.local.Proxy.
-    __slots__ = ('__local', '__dict__', '__name__')
+    __slots__ = ('__local', '__args', '__kwargs', '__dict__', '__name__')
 
-    def __init__(self, local, name=None):
+    def __init__(self, local, args=None, kwargs=None, name=None):
         object.__setattr__(self, '_Proxy__local', local)
+        object.__setattr__(self, '_Proxy__args', args or ())
+        object.__setattr__(self, '_Proxy__kwargs', kwargs or {})
         object.__setattr__(self, '__custom_name__', name)
 
     @property
@@ -45,13 +47,17 @@ class Proxy(object):
     def __doc__(self):
         return self._get_current_object().__doc__
 
+    @property
+    def __class__(self):
+        return self._get_current_object().__class__
+
     def _get_current_object(self):
         """Return the current object.  This is useful if you want the real
         object behind the proxy at a time for performance reasons or because
         you want to pass the object into a different context.
         """
         if not hasattr(self.__local, '__release_local__'):
-            return self.__local()
+            return self.__local(*self.__args, **self.__kwargs)
         try:
             return getattr(self.__local, self.__name__)
         except AttributeError:
@@ -151,3 +157,34 @@ class Proxy(object):
     __coerce__ = lambda x, o: x.__coerce__(x, o)
     __enter__ = lambda x: x.__enter__()
     __exit__ = lambda x, *a, **kw: x.__exit__(*a, **kw)
+    __reduce__ = lambda x: x._get_current_object().__reduce__()
+
+
+class PromiseProxy(Proxy):
+    """This is a proxy to an object that has not yet been evaulated.
+
+    :class:`Proxy` will evaluate the object each time, while the
+    promise will only evaluate it once.
+
+    """
+
+    def _get_current_object(self):
+        try:
+            return object.__getattribute__(self, "__thing")
+        except AttributeError:
+            return self.__evaluate__()
+
+    def __maybe_evaluate__(self):
+        return self._get_current_object()
+
+    def __evaluate__(self):
+        thing = Proxy._get_current_object(self)
+        object.__setattr__(self, "__thing", thing)
+        return thing
+
+
+def maybe_evaluate(obj):
+    try:
+        return obj.__maybe_evaluate__()
+    except AttributeError:
+        return obj

+ 1 - 1
celery/tests/test_task/__init__.py

@@ -191,7 +191,7 @@ class TestCeleryTasks(Case):
         def xxx():
             pass
 
-        self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx)
+        self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx.app.tasks[xxx.name])
 
     def createTask(self, name):
         return task.task(__module__=self.__module__, name=name)(return_True)