Browse Source

The celery.task module can now be used as a decorator (!!!)

Also you can now import group, chain, chord and subtask from the celery
module::

    from celery import task, group, chain

    @task
    def foo(..)
Ask Solem 13 years ago
parent
commit
fbb94b42ad

+ 1 - 1
Changelog

@@ -31,7 +31,7 @@
 
     .. code-block:: python
 
-        from celery.task import chain
+        from celery import chain
 
         # (2 + 2) * 8 / 2
         res = chain(add.subtask((4, 4)),

+ 5 - 5
FAQ

@@ -414,7 +414,7 @@ If you don't use the results for a task, make sure you set the
 
 .. code-block python
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def mytask():
         ...
 
@@ -492,7 +492,7 @@ How can I get the task id of the current task?
 
 **Answer**: The current id and more is available in the task request::
 
-    @task
+    @celery.task
     def mytask():
         cache.set(mytask.request.id, "Running")
 
@@ -534,7 +534,7 @@ Also, a common pattern is to use callback tasks:
 
 .. code-block:: python
 
-    @task()
+    @celery.task()
     def add(x, y, callback=None):
         result = x + y
         if callback:
@@ -542,7 +542,7 @@ Also, a common pattern is to use callback tasks:
         return result
 
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def log_result(result, **kwargs):
         logger = log_result.get_logger(**kwargs)
         logger.info("log_result got: %s" % (result, ))
@@ -651,7 +651,7 @@ this is rarely the case. Imagine the following task:
 
 .. code-block:: python
 
-    @task()
+    @celery.task()
     def process_upload(filename, tmpfile):
         # Increment a file count stored in a database
         increment_file_counter()

+ 54 - 1
celery/__compat__.py

@@ -1,16 +1,25 @@
 from __future__ import absolute_import
 
+import operator
 import sys
 
 from types import ModuleType
 
 from .local import Proxy
-from .utils.compat import fun_of_method
 
 MODULE_DEPRECATED = """
 The module %s is deprecated and will be removed in a future version.
 """
 
+# im_func is no longer available in Py3.
+# instead the unbound method itself can be used.
+if sys.version_info[0] == 3:
+    def fun_of_method(method):
+        return method
+else:
+    def fun_of_method(method):  # noqa
+        return method.im_func
+
 
 def _compat_task_decorator(*args, **kwargs):
     from celery import current_app
@@ -100,3 +109,47 @@ class class_property(object):
 
 def reclassmethod(method):
     return classmethod(fun_of_method(method))
+
+
+class MagicModule(ModuleType):
+    _compat_modules = ()
+    _all_by_module = {}
+    _direct = {}
+
+    def __getattr__(self, name):
+        origins = self._object_origins
+        if name in origins:
+            module = __import__(origins[name], None, None, [name])
+            for extra_name in self._all_by_module[module.__name__]:
+                setattr(self, extra_name, getattr(module, extra_name))
+            return getattr(module, name)
+        elif name in self._direct:
+            module = __import__(self._direct[name], None, None, [name])
+            setattr(self, name, module)
+            return module
+        elif name in self._compat_modules:
+            setattr(self, name, get_compat(self.current_app, self, name))
+        return ModuleType.__getattribute__(self, name)
+
+    def __dir__(self):
+        return list(set(self.__all__
+                     + ("__file__", "__path__", "__doc__", "__all__")))
+
+
+
+def create_magic_module(name, compat_modules=(), by_module={}, direct={},
+        base=MagicModule, **attrs):
+    old_module = sys.modules[name]
+    origins = {}
+    for module, items in by_module.iteritems():
+        for item in items:
+            origins[item] = module
+
+    cattrs = dict(_compat_modules=compat_modules,
+                  _all_by_module=by_module, _direct=direct,
+                  _object_origins=origins,
+                  __all__=tuple(set(reduce(operator.add, map(tuple, [
+                                compat_modules, origins, direct, attrs])))))
+    new_module = sys.modules[name] = type(name, (base, ), cattrs)(name)
+    new_module.__dict__.update(attrs)
+    return old_module, new_module

+ 23 - 46
celery/__init__.py

@@ -24,49 +24,7 @@ if sys.version_info < (2, 5):
 # Lazy loading
 from types import ModuleType
 from .local import Proxy
-
-
-compat_modules = ("messaging", "log", "registry", "decorators")
-
-
-class module(ModuleType):
-    __all__ = ("Celery", "current_app", "bugreport")
-
-    def __getattr__(self, name):
-        if name in compat_modules:
-            from .__compat__ import get_compat
-            setattr(self, name, get_compat(self.current_app, self, name))
-        return ModuleType.__getattribute__(self, name)
-
-    def __dir__(self):
-        result = list(new_module.__all__)
-        result.extend(("__file__", "__path__", "__doc__", "__all__",
-                       "__docformat__", "__name__", "__path__", "VERSION",
-                       "__package__", "__version__", "__author__",
-                       "__contact__", "__homepage__", "__docformat__"))
-        return result
-
-# 2.5 does not define __package__
-try:
-    package = __package__
-except NameError:
-    package = "kombu"
-
-# keep a reference to this module so that it's not garbage collected
-old_module = sys.modules[__name__]
-
-new_module = sys.modules[__name__] = module(__name__)
-new_module.__dict__.update({
-    "__file__": __file__,
-    "__path__": __path__,
-    "__doc__": __doc__,
-    "__version__": __version__,
-    "__author__": __author__,
-    "__contact__": __contact__,
-    "__homepage__": __homepage__,
-    "__docformat__": __docformat__,
-    "__package__": package,
-    "VERSION": VERSION})
+from .__compat__ import create_magic_module
 
 
 def Celery(*args, **kwargs):
@@ -83,6 +41,25 @@ current_app = Proxy(_get_current_app)
 def bugreport():
     return current_app.bugreport()
 
-new_module.Celery = Celery
-new_module.current_app = current_app
-new_module.bugreport = bugreport
+
+old_module, new_module = create_magic_module(__name__,
+    compat_modules=("messaging", "log", "registry", "decorators"),
+    by_module={
+        "celery.task.sets": ["chain", "group", "subtask"],
+        "celery.task.chords": ["chord"],
+    },
+    direct={"task": "celery.task"},
+    __package__="celery",
+    __file__=__file__,
+    __path__=__path__,
+    __doc__=__doc__,
+    __version__=__version__,
+    __author__=__author__,
+    __contact__=__contact__,
+    __homepage__=__homepage__,
+    __docformat__=__docformat__,
+    VERSION=VERSION,
+    Celery=Celery,
+    current_app=current_app,
+    bugreport=bugreport,
+)

+ 27 - 92
celery/task/__init__.py

@@ -11,100 +11,35 @@
 """
 from __future__ import absolute_import
 
+import sys
+
 from .. import current_app
+from ..__compat__ import MagicModule, create_magic_module
 from ..app import current_task as _current_task
 from ..local import Proxy
-from ..utils import uuid
-
-from .base import BaseTask, Task, PeriodicTask  # noqa
-from .sets import group, TaskSet, subtask       # noqa
-from .chords import chord                       # noqa
-from .control import discard_all                # noqa
-
-current = Proxy(_current_task)
-
-
-def task(*args, **kwargs):
-    """Decorator to create a task class out of any callable.
-
-    **Examples**
-
-    .. code-block:: python
-
-        @task
-        def refresh_feed(url):
-            return Feed.objects.get(url=url).refresh()
-
-    With setting extra options and using retry.
-
-    .. code-block:: python
-
-        @task(max_retries=10)
-        def refresh_feed(url):
-            try:
-                return Feed.objects.get(url=url).refresh()
-            except socket.error, exc:
-                refresh_feed.retry(exc=exc)
-
-    Calling the resulting task:
-
-            >>> refresh_feed("http://example.com/rss") # Regular
-            <Feed: http://example.com/rss>
-            >>> refresh_feed.delay("http://example.com/rss") # Async
-            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
-    """
-    return current_app.task(*args, **dict({"accept_magic_kwargs": False,
-                                           "base": Task}, **kwargs))
-
-
-def periodic_task(*args, **options):
-    """Decorator to create a task class out of any callable.
-
-        .. admonition:: Examples
-
-            .. code-block:: python
-
-                @task
-                def refresh_feed(url):
-                    return Feed.objects.get(url=url).refresh()
-
-            With setting extra options and using retry.
-
-            .. code-block:: python
-
-                from celery.task import current
-
-                @task(exchange="feeds")
-                def refresh_feed(url):
-                    try:
-                        return Feed.objects.get(url=url).refresh()
-                    except socket.error, exc:
-                        current.retry(exc=exc)
-
-            Calling the resulting task:
-
-                >>> refresh_feed("http://example.com/rss") # Regular
-                <Feed: http://example.com/rss>
-                >>> refresh_feed.delay("http://example.com/rss") # Async
-                <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
-
-    """
-    return task(**dict({"base": PeriodicTask}, **options))
-
-backend_cleanup = Proxy(lambda: current_app.tasks["celery.backend_cleanup"])
-
-
-class chain(object):
 
-    def __init__(self, *tasks):
-        self.tasks = tasks
 
-    def apply_async(self, **kwargs):
-        tasks = [task.clone(task_id=uuid(), **kwargs)
-                    for task in self.tasks]
-        reduce(lambda a, b: a.link(b), tasks)
-        tasks[0].apply_async()
-        results = [task.type.AsyncResult(task.options["task_id"])
-                        for task in tasks]
-        reduce(lambda a, b: a.set_parent(b), reversed(results))
-        return results[-1]
+class module(MagicModule):
+
+    def __call__(self, *args, **kwargs):
+        return self.task(*args, **kwargs)
+
+
+old_module, new_module = create_magic_module(__name__,
+    by_module={
+        "celery.task.base": ["BaseTask", "Task", "PeriodicTask",
+                             "task", "periodic_task"],
+        "celery.task.sets": ["chain", "group", "TaskSet", "subtask"],
+        "celery.task.chords": ["chord"],
+    },
+    base=module,
+    __package__="celery.task",
+    __file__=__file__,
+    __path__=__path__,
+    __doc__=__doc__,
+    current=Proxy(_current_task),
+    discard_all=Proxy(lambda: current_app.control.discard_all),
+    backend_cleanup=Proxy(
+        lambda: current_app.tasks["celery.backend_cleanup"]
+    ),
+)

+ 69 - 0
celery/task/base.py

@@ -13,6 +13,7 @@
 """
 from __future__ import absolute_import
 
+from .. import current_app
 from ..__compat__ import class_property, reclassmethod
 from ..app.task import Context, TaskType, BaseTask  # noqa
 from ..schedules import maybe_schedule
@@ -64,3 +65,71 @@ class PeriodicTask(Task):
                 "options": cls.options or {},
                 "relative": cls.relative,
         }
+
+
+def task(*args, **kwargs):
+    """Decorator to create a task class out of any callable.
+
+    **Examples**
+
+    .. code-block:: python
+
+        @task
+        def refresh_feed(url):
+            return Feed.objects.get(url=url).refresh()
+
+    With setting extra options and using retry.
+
+    .. code-block:: python
+
+        @task(max_retries=10)
+        def refresh_feed(url):
+            try:
+                return Feed.objects.get(url=url).refresh()
+            except socket.error, exc:
+                refresh_feed.retry(exc=exc)
+
+    Calling the resulting task:
+
+            >>> refresh_feed("http://example.com/rss") # Regular
+            <Feed: http://example.com/rss>
+            >>> refresh_feed.delay("http://example.com/rss") # Async
+            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
+    """
+    return current_app.task(*args, **dict({"accept_magic_kwargs": False,
+                                           "base": Task}, **kwargs))
+
+
+def periodic_task(*args, **options):
+    """Decorator to create a task class out of any callable.
+
+        .. admonition:: Examples
+
+            .. code-block:: python
+
+                @task
+                def refresh_feed(url):
+                    return Feed.objects.get(url=url).refresh()
+
+            With setting extra options and using retry.
+
+            .. code-block:: python
+
+                from celery.task import current
+
+                @task(exchange="feeds")
+                def refresh_feed(url):
+                    try:
+                        return Feed.objects.get(url=url).refresh()
+                    except socket.error, exc:
+                        current.retry(exc=exc)
+
+            Calling the resulting task:
+
+                >>> refresh_feed("http://example.com/rss") # Regular
+                <Feed: http://example.com/rss>
+                >>> refresh_feed.delay("http://example.com/rss") # Async
+                <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
+
+    """
+    return task(**dict({"base": PeriodicTask}, **options))

+ 16 - 0
celery/task/sets.py

@@ -132,6 +132,22 @@ def maybe_subtask(t):
     return t
 
 
+class chain(object):
+
+    def __init__(self, *tasks):
+        self.tasks = tasks
+
+    def apply_async(self, **kwargs):
+        tasks = [task.clone(task_id=uuid(), **kwargs)
+                    for task in self.tasks]
+        reduce(lambda a, b: a.link(b), tasks)
+        tasks[0].apply_async()
+        results = [task.type.AsyncResult(task.options["task_id"])
+                        for task in tasks]
+        reduce(lambda a, b: a.set_parent(b), reversed(results))
+        return results[-1]
+
+
 class group(UserList):
     """A task containing several subtasks, making it possible
     to track how many, or when all of the tasks have been completed.

+ 1 - 2
docs/configuration.rst

@@ -1097,10 +1097,9 @@ Example:
 
 .. code-block:: python
 
-    from celery.task import task
     from celery.exceptions import SoftTimeLimitExceeded
 
-    @task()
+    @celery.task
     def mytask():
         try:
             return do_work()

+ 2 - 2
docs/django/first-steps-with-django.rst

@@ -72,9 +72,9 @@ a new file called ``celerytest/tasks.py``:
 
 .. code-block:: python
 
-    from celery.task import task
+    from celery import task
 
-    @task()
+    @task
     def add(x, y):
         return x + y
 

+ 1 - 1
docs/includes/introduction.txt

@@ -103,7 +103,7 @@ adding two numbers:
 
 .. code-block:: python
 
-    from celery.task import task
+    from celery import task
 
     @task
     def add(x, y):

+ 1 - 1
docs/internals/guide.rst

@@ -186,7 +186,7 @@ Here's an example using Celery in single-mode:
 
 .. code-block:: python
 
-    from celery.task import task
+    from celery import task
     from celery.task.control import inspect
 
     from .models import CeleryStats

+ 1 - 1
docs/tutorials/debugging.rst

@@ -15,8 +15,8 @@ Example usage:
 
 .. code-block:: python
 
+    from celery import task
     from celery.contrib import rdb
-    from celery.task import task
 
     @task
     def add(x, y):

+ 1 - 1
docs/userguide/executing.rst

@@ -41,7 +41,7 @@ called `add`, returning the sum of two positional arguments:
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def add(x, y):
         return x + y
 

+ 27 - 27
docs/userguide/tasks.rst

@@ -25,7 +25,7 @@ Given a function create_user`, that takes two arguments: `username` and
 
     from django.contrib.auth import User
 
-    @task
+    @celery.task
     def create_user(username, password):
         User.objects.create(username=username, password=password)
 
@@ -34,7 +34,7 @@ Task options are added as arguments to `task`:
 
 .. code-block:: python
 
-    @task(serializer="json")
+    @celery.task(serializer="json")
     def create_user(username, password):
         User.objects.create(username=username, password=password)
 
@@ -81,7 +81,7 @@ Example Usage
 
 ::
 
-    @task
+    @celery.task
     def add(x, y):
         print("Executing task id %r, args: %r kwargs: %r" % (
             add.request.id, add.request.args, add.request.kwargs))
@@ -96,7 +96,7 @@ the worker log:
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def add(x, y):
         logger = add.get_logger()
         logger.info("Adding %s + %s" % (x, y))
@@ -119,7 +119,7 @@ It will do the right thing, and respect the
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def send_twitter_status(oauth, tweet):
         try:
             twitter = Twitter(oauth)
@@ -160,7 +160,7 @@ You can also provide the `countdown` argument to
 
 .. code-block:: python
 
-    @task(default_retry_delay=30 * 60)  # retry in 30 minutes.
+    @celery.task(default_retry_delay=30 * 60)  # retry in 30 minutes.
     def add(x, y):
         try:
             ...
@@ -370,7 +370,7 @@ For example:
 
 .. code-block:: python
 
-    >>> @task(name="sum-of-two-numbers")
+    >>> @celery.task(name="sum-of-two-numbers")
     >>> def add(x, y):
     ...     return x + y
 
@@ -383,7 +383,7 @@ another module:
 
 .. code-block:: python
 
-    >>> @task(name="tasks.add")
+    >>> @celery.task(name="tasks.add")
     >>> def add(x, y):
     ...     return x + y
 
@@ -396,7 +396,7 @@ task if the module name is "tasks.py":
 
 .. code-block:: python
 
-    >>> @task()
+    >>> @celery.task()
     >>> def add(x, y):
     ...     return x + y
 
@@ -450,14 +450,14 @@ decorator is applied last:
 
 .. code-block:: python
 
-    @task
+    @celery.task
     @decorator2
     @decorator1
     def add(x, y):
         return x + y
 
 
-Which means the `@task` decorator must be the top statement.
+Which means the `@celery.task` decorator must be the top statement.
 
 .. _task-states:
 
@@ -638,7 +638,7 @@ which defines its own custom :state:`ABORTED` state.
 Use :meth:`Task.update_state <celery.task.base.BaseTask.update_state>` to
 update a task's state::
 
-    @task
+    @celery.task
     def upload_files(filenames):
         for i, file in enumerate(filenames):
             upload_files.update_state(state="PROGRESS",
@@ -720,7 +720,7 @@ The following code,
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def add(x, y):
         return x + y
 
@@ -729,7 +729,7 @@ will do roughly this behind the scenes:
 
 .. code-block:: python
 
-    @task
+    @celery.task
     class AddTask(Task):
 
         def run(self, x, y):
@@ -791,7 +791,7 @@ base class for new task types.
             print("Task returned: %r" % (self.request, ))
 
 
-    @task(base=DebugTask)
+    @celery.task(base=DebugTask)
     def add(x, y):
         return x + y
 
@@ -941,7 +941,7 @@ wastes time and resources.
 
 .. code-block:: python
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def mytask(...)
         something()
 
@@ -978,21 +978,21 @@ Make your design asynchronous instead, for example by using *callbacks*.
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def update_page_info(url):
         page = fetch_page.delay(url).get()
         info = parse_page.delay(url, page).get()
         store_page_info.delay(url, info)
 
-    @task
+    @celery.task
     def fetch_page(url):
         return myhttplib.get(url)
 
-    @task
+    @celery.task
     def parse_page(url, page):
         return myparser.parse_document(page)
 
-    @task
+    @celery.task
     def store_page_info(url, info):
         return PageInfo.objects.create(url, info)
 
@@ -1001,13 +1001,13 @@ Make your design asynchronous instead, for example by using *callbacks*.
 
 .. code-block:: python
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def update_page_info(url):
         # fetch_page -> parse_page -> store_page
         fetch_page.delay(url, callback=subtask(parse_page,
                                     callback=subtask(store_page_info)))
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def fetch_page(url, callback=None):
         page = myhttplib.get(url)
         if callback:
@@ -1016,13 +1016,13 @@ Make your design asynchronous instead, for example by using *callbacks*.
             # into a subtask object.
             subtask(callback).delay(url, page)
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def parse_page(url, page, callback=None):
         info = myparser.parse_document(page)
         if callback:
             subtask(callback).delay(url, info)
 
-    @task(ignore_result=True)
+    @celery.task(ignore_result=True)
     def store_page_info(url, info):
         PageInfo.objects.create(url, info)
 
@@ -1122,7 +1122,7 @@ that automatically expands some abbreviations in it:
         title = models.CharField()
         body = models.TextField()
 
-    @task
+    @celery.task
     def expand_abbreviations(article):
         article.body.replace("MyCorp", "My Corporation")
         article.save()
@@ -1143,7 +1143,7 @@ re-fetch the article in the task body:
 
 .. code-block:: python
 
-    @task
+    @celery.task
     def expand_abbreviations(article_id):
         article = Article.objects.get(id=article_id)
         article.body.replace("MyCorp", "My Corporation")
@@ -1302,7 +1302,7 @@ blog/tasks.py
     from blog.models import Comment
 
 
-    @task
+    @celery.task
     def spam_filter(comment_id, remote_addr=None):
         logger = spam_filter.get_logger()
         logger.info("Running spam filter for comment %s" % comment_id)

+ 12 - 8
docs/userguide/tasksets.rst

@@ -15,11 +15,17 @@ Subtasks
 .. versionadded:: 2.0
 
 The :class:`~celery.task.sets.subtask` type is used to wrap the arguments and
-execution options for a single task invocation::
+execution options for a single task invocation:
+
+.. code-block:: python
+
+    from celery import subtask
 
     subtask(task_name_or_cls, args, kwargs, options)
 
-For convenience every task also has a shortcut to create subtasks::
+For convenience every task also has a shortcut to create subtasks:
+
+.. code-block:: python
 
     task.subtask(args, kwargs, options)
 
@@ -81,7 +87,7 @@ tasks were invoked.
 
 ``group`` takes a list of :class:`~celery.task.sets.subtask`'s::
 
-    >>> from celery.task import group
+    >>> from celery import group
     >>> from tasks import add
 
     >>> job = group([
@@ -175,13 +181,11 @@ already a standard function):
 
 .. code-block:: python
 
-    from celery.task import task
-
-    @task
+    @celery.task
     def add(x, y):
         return x + y
 
-    @task
+    @celery.task
     def tsum(numbers):
         return sum(numbers)
 
@@ -189,7 +193,7 @@ already a standard function):
 Now we can use a chord to calculate each addition step in parallel, and then
 get the sum of the resulting numbers::
 
-    >>> from celery.task import chord
+    >>> from celery import chord
     >>> from tasks import add, tsum
 
     >>> chord(add.subtask((i, i))