Browse Source

group/chord/chain are now subtasks

- The source code for these, including subtask, has been moved
to new module celery.canvas.

- group is no longer an alias to TaskSet, but new alltogether,
since it was very difficult to migrate the TaskSet class to become
a subtask.

- A new shortcut has been added to tasks:

    >>> task.s(arg1, arg2, kw=1)

    as a shortcut to:

    >>> task.subtask((arg1, arg2), {"kw": 1})

- Tasks can be chained by using the | operator:

    >>> (add.s(2, 2), pow.s(2)).apply_async()

- Subtasks can be "evaluated" using the ~ operator:

    >>> ~add.s(2, 2)
    4

    >>> ~(add.s(2, 2) | pow.s(2))

    is the same as:

    >>> chain(add.s(2, 2), pow.s(2)).apply_async().get()

- A new subtask_type key has been added to the subtask dicts

    This can be the string "chord", "group", or "chain"

- maybe_subtask now uses subtask_type to reconstruct
  the object, to be used when using non-pickle serializers.

- The logic for these operations have been moved to dedicated
  tasks celery.chord, celery.chain and celery.group.

- subtask no longer inherits from AttributeDict.

    It's now a pure dict subclass with properties for attribute
    access to the relevant keys.

- The repr's now outputs how the sequence would like imperatively:

    >>> from celery import chord

    >>> (chord([add.s(i, i) for i in xrange(10)], xsum.s())
          | pow.s(2))
    tasks.xsum([tasks.add(0, 0),
                tasks.add(1, 1),
                tasks.add(2, 2),
                tasks.add(3, 3),
                tasks.add(4, 4),
                tasks.add(5, 5),
                tasks.add(6, 6),
                tasks.add(7, 7),
                tasks.add(8, 8),
                tasks.add(9, 9)]) | tasks.pow(2)
Ask Solem 13 years ago
parent
commit
35e56513db

+ 3 - 4
celery/__init__.py

@@ -20,10 +20,9 @@ from .__compat__ import recreate_module
 
 
 old_module, new_module = recreate_module(__name__,
 old_module, new_module = recreate_module(__name__,
     by_module={
     by_module={
-        "celery.app":         ["Celery", "bugreport"],
-        "celery.app.state":   ["current_app", "current_task"],
-        "celery.task.sets":   ["chain", "group", "subtask"],
-        "celery.task.chords": ["chord"],
+        "celery.app":       ["Celery", "bugreport"],
+        "celery.app.state": ["current_app", "current_task"],
+        "celery.canvas":    ["chain", "chord", "group", "subtask"],
     },
     },
     direct={"task": "celery.task"},
     direct={"task": "celery.task"},
     __package__="celery",
     __package__="celery",

+ 7 - 4
celery/app/__init__.py

@@ -16,14 +16,17 @@ import os
 from celery.local import Proxy
 from celery.local import Proxy
 
 
 from . import state
 from . import state
+from .state import (  # noqa
+        set_default_app,
+        get_current_app as current_app,
+        get_current_task as current_task,
+)
 from .base import Celery, AppPickler  # noqa
 from .base import Celery, AppPickler  # noqa
 
 
-set_default_app = state.set_default_app
-current_app = state.get_current_app
-current_task = state.get_current_task
+#: Proxy always returning the app set as default.
 default_app = Proxy(lambda: state.default_app)
 default_app = Proxy(lambda: state.default_app)
 
 
-#: Returns the app provided or the default app if none.
+#: Function returning the app provided or the default app if none.
 #:
 #:
 #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
 #: The environment variable :envvar:`CELERY_TRACE_APP` is used to
 #: trace app leaks.  When enabled an exception is raised if there
 #: trace app leaks.  When enabled an exception is raised if there

+ 10 - 8
celery/app/base.py

@@ -112,12 +112,6 @@ class Celery(object):
         """Optional callback called at init."""
         """Optional callback called at init."""
         pass
         pass
 
 
-    def create_task_cls(self):
-        """Creates a base task class using default configuration
-        taken from this app."""
-        return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
-                                       attribute="_app", abstract=True)
-
     def start(self, argv=None):
     def start(self, argv=None):
         """Run :program:`celery` using `argv`.  Uses :data:`sys.argv`
         """Run :program:`celery` using `argv`.  Uses :data:`sys.argv`
         if `argv` is not specified."""
         if `argv` is not specified."""
@@ -378,6 +372,8 @@ class Celery(object):
         return first(None, values) or self.conf.get(default_key)
         return first(None, values) or self.conf.get(default_key)
 
 
     def bugreport(self):
     def bugreport(self):
+        """Returns a string with information useful for the Celery core
+        developers when reporting a bug."""
         return bugreport(self)
         return bugreport(self)
 
 
     def _get_backend(self):
     def _get_backend(self):
@@ -396,6 +392,12 @@ class Celery(object):
             self._pool.force_close_all()
             self._pool.force_close_all()
             self._pool = None
             self._pool = None
 
 
+    def create_task_cls(self):
+        """Creates a base task class using default configuration
+        taken from this app."""
+        return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
+                                       attribute="_app", abstract=True)
+
     def subclass_with_self(self, Class, name=None, attribute="app",
     def subclass_with_self(self, Class, name=None, attribute="app",
             reverse=None, **kw):
             reverse=None, **kw):
         """Subclass an app-compatible class by setting its app attribute
         """Subclass an app-compatible class by setting its app attribute
@@ -454,7 +456,7 @@ class Celery(object):
 
 
     @cached_property
     @cached_property
     def TaskSet(self):
     def TaskSet(self):
-        return self.subclass_with_self("celery.task.sets:group")
+        return self.subclass_with_self("celery.task.sets:TaskSet")
 
 
     @cached_property
     @cached_property
     def Task(self):
     def Task(self):
@@ -515,7 +517,7 @@ class Celery(object):
 
 
     @cached_property
     @cached_property
     def log(self):
     def log(self):
-        """Logging utilities.  See :class:`~celery.log.Logging`."""
+        """Logging utilities.  See :class:`~celery.app.log.Logging`."""
         return instantiate(self.log_cls, app=self)
         return instantiate(self.log_cls, app=self)
 
 
     @cached_property
     @cached_property

+ 117 - 23
celery/app/builtins.py

@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 from __future__ import absolute_import
+from __future__ import with_statement
 
 
 from celery.utils import uuid
 from celery.utils import uuid
 
 
@@ -49,7 +50,7 @@ def add_unlock_chord_task(app):
     It creates a task chain polling the header for completion.
     It creates a task chain polling the header for completion.
 
 
     """
     """
-    from celery.task.sets import subtask
+    from celery.canvas import subtask
     from celery import result as _res
     from celery import result as _res
 
 
     @app.task(name="celery.chord_unlock", max_retries=None)
     @app.task(name="celery.chord_unlock", max_retries=None)
@@ -65,30 +66,123 @@ def add_unlock_chord_task(app):
     return unlock_chord
     return unlock_chord
 
 
 
 
+@builtin_task
+def add_group_task(app):
+    from celery.canvas import subtask
+    from celery.app.state import get_current_task
+    from celery.result import from_serializable
+
+    class Group(app.Task):
+        name = "celery.group"
+        accept_magic_kwargs = False
+
+        def run(self, tasks, result):
+            app = self.app
+            result = from_serializable(result)
+            with app.pool.acquire(block=True) as conn:
+                with app.amqp.TaskPublisher(conn) as publisher:
+                    res_ = [subtask(task).apply_async(
+                                        taskset_id=self.request.taskset,
+                                        publisher=publisher)
+                                for task in tasks]
+            parent = get_current_task()
+            if parent:
+                parent.request.children.append(result)
+            if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
+                return app.TaskSetResult(result.id, res_)
+            return result
+
+        def prepare(self, options, tasks, **kwargs):
+            r = []
+            options["taskset_id"] = group_id = \
+                    options.setdefault("task_id", uuid())
+            for task in tasks:
+                tid = task.options.setdefault("task_id", uuid())
+                task.options["taskset_id"] = group_id
+                r.append(self.AsyncResult(tid))
+            return tasks, self.app.TaskSetResult(group_id, r)
+
+        def apply_async(self, args=(), kwargs={}, **options):
+            if self.app.conf.CELERY_ALWAYS_EAGER:
+                return self.apply(args, kwargs, **options)
+            tasks, result = self.prepare(options, **kwargs)
+            super(Group, self).apply_async((tasks, result), **options)
+            return result
+
+        def apply(self, args=(), kwargs={}, **options):
+            tasks, result = self.prepare(options, **kwargs)
+            return super(Group, self).apply((tasks, result), {"eager": True},
+                                            **options)
+
+    return Group
+
+
+@builtin_task
+def add_chain_task(app):
+    from celery.canvas import maybe_subtask
+
+    class Chain(app.Task):
+        name = "celery.chain"
+        accept_magic_kwargs = False
+
+        def apply_async(self, args=(), kwargs={}, **options):
+            if self.app.conf.CELERY_ALWAYS_EAGER:
+                return self.apply(args, kwargs, **options)
+            tasks = kwargs["tasks"]
+            tasks = [maybe_subtask(task).clone(task_id=uuid(), **kwargs)
+                        for task in kwargs["tasks"]]
+            reduce(lambda a, b: a.link(b), tasks)
+            tasks[0].apply_async()
+            results = [task.type.AsyncResult(task.options["task_id"])
+                            for task in tasks]
+            reduce(lambda a, b: a.set_parent(b), reversed(results))
+            return results[-1]
+
+    return Chain
+
+
 @builtin_task
 @builtin_task
 def add_chord_task(app):
 def add_chord_task(app):
     """Every chord is executed in a dedicated task, so that the chord
     """Every chord is executed in a dedicated task, so that the chord
     can be used as a subtask, and this generates the task
     can be used as a subtask, and this generates the task
     responsible for that."""
     responsible for that."""
-    from celery.task.sets import TaskSet
-
-    @app.task(name="celery.chord", accept_magic_kwargs=False)
-    def chord(set, body, interval=1, max_retries=None,
-            propagate=False, **kwargs):
-
-        if not isinstance(set, TaskSet):
-            set = TaskSet(set)
-        r = []
-        setid = uuid()
-        for task in set.tasks:
-            tid = uuid()
-            task.options.update(task_id=tid, chord=body)
-            r.append(app.AsyncResult(tid))
-        app.backend.on_chord_apply(setid, body,
-                                   interval=interval,
-                                   max_retries=max_retries,
-                                   propagate=propagate,
-                                   result=r)
-        set.apply_async(taskset_id=setid)
-
-    return chord
+    from celery import group
+    from celery.canvas import maybe_subtask
+
+    class Chord(app.Task):
+        name = "celery.chord"
+        accept_magic_kwargs = False
+        ignore_result = True
+
+        def run(self, header, body, interval=1, max_retries=None,
+                propagate=False, eager=False, **kwargs):
+            if not isinstance(header, group):
+                header = group(header)
+            r = []
+            setid = uuid()
+            for task in header.tasks:
+                tid = task.options.setdefault("task_id", uuid())
+                task.options["chord"] = body
+                r.append(app.AsyncResult(tid))
+            app.backend.on_chord_apply(setid, body,
+                                       interval=interval,
+                                       max_retries=max_retries,
+                                       propagate=propagate,
+                                       result=r)
+            return header(taskset_id=setid)
+
+        def apply_async(self, args=(), kwargs={}, task_id=None, **options):
+            if self.app.conf.CELERY_ALWAYS_EAGER:
+                return self.apply(args, kwargs, **options)
+            body = maybe_subtask(kwargs["body"])
+
+            callback_id = body.options.setdefault("task_id", task_id or uuid())
+            super(Chord, self).apply_async(args, kwargs, **options)
+            return self.AsyncResult(callback_id)
+
+        def apply(self, args=(), kwargs={}, **options):
+            body = kwargs["body"]
+            res = super(Chord, self).apply(args, kwargs, **options)
+            return maybe_subtask(body).apply(args=(res.get().join(), ))
+
+    return Chord

+ 7 - 2
celery/app/task.py

@@ -702,12 +702,17 @@ class BaseTask(object):
                                                     task_name=self.name)
                                                     task_name=self.name)
 
 
     def subtask(self, *args, **kwargs):
     def subtask(self, *args, **kwargs):
-        """Returns :class:`~celery.task.sets.subtask` object for
+        """Returns :class:`~celery.subtask` object for
         this task, wrapping arguments and execution options
         this task, wrapping arguments and execution options
         for a single task invocation."""
         for a single task invocation."""
-        from celery.task.sets import subtask
+        from celery.canvas import subtask
+        print("SUBTASK: %r" % (subtask, ))
         return subtask(self, *args, **kwargs)
         return subtask(self, *args, **kwargs)
 
 
+    def s(self, *args, **kwargs):
+        """``.s(*a, **k) -> .subtask(a, k)``"""
+        return self.subtask(args, kwargs)
+
     def update_state(self, task_id=None, state=None, meta=None):
     def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.
         """Update task state.
 
 

+ 1 - 1
celery/backends/cache.py

@@ -104,7 +104,7 @@ class CacheBackend(KeyValueStoreBackend):
         self.client.set(key, '0', time=86400)
         self.client.set(key, '0', time=86400)
 
 
     def on_chord_part_return(self, task, propagate=False):
     def on_chord_part_return(self, task, propagate=False):
-        from celery.task.sets import subtask
+        from celery import subtask
         from celery.result import TaskSetResult
         from celery.result import TaskSetResult
         setid = task.request.taskset
         setid = task.request.taskset
         if not setid:
         if not setid:

+ 1 - 1
celery/backends/redis.py

@@ -91,7 +91,7 @@ class RedisBackend(KeyValueStoreBackend):
         self.app.TaskSetResult(setid, result).save()
         self.app.TaskSetResult(setid, result).save()
 
 
     def on_chord_part_return(self, task, propagate=False):
     def on_chord_part_return(self, task, propagate=False):
-        from celery.task.sets import subtask
+        from celery import subtask
         from celery.result import TaskSetResult
         from celery.result import TaskSetResult
         setid = task.request.taskset
         setid = task.request.taskset
         if not setid:
         if not setid:

+ 2 - 1
celery/bin/celery.py

@@ -427,7 +427,8 @@ class shell(Command):
                        "BaseTask": celery.task.base.BaseTask,
                        "BaseTask": celery.task.base.BaseTask,
                        "chord": celery.chord,
                        "chord": celery.chord,
                        "group": celery.group,
                        "group": celery.group,
-                       "chain": celery.chain}
+                       "chain": celery.chain,
+                       "subtask": celery.subtask}
 
 
         if not without_tasks:
         if not without_tasks:
             self.locals.update(dict((task.__name__, task)
             self.locals.update(dict((task.__name__, task)

+ 296 - 0
celery/canvas.py

@@ -0,0 +1,296 @@
+"""
+    celery.canvas
+    ~~~~~~~~~~~~~
+
+    Designing task workflows.
+
+    :copyright: (c) 2009 - 2012 by Ask Solem.
+    :license: BSD, see LICENSE for more details.
+
+"""
+from __future__ import absolute_import
+
+from itertools import chain as _chain
+
+from kombu.utils import kwdict, reprcall
+
+from celery import current_app
+from celery.local import Proxy
+from celery.utils import cached_property, uuid
+from celery.utils.functional import maybe_list
+from celery.utils.compat import chain_from_iterable
+
+Chord = Proxy(lambda: current_app.tasks["celery.chord"])
+
+
+class _getitem_property(object):
+
+    def __init__(self, key):
+        self.key = key
+
+    def __get__(self, obj, type=None):
+        if obj is None:
+            return type
+        return obj[self.key]
+
+    def __set__(self, obj, value):
+        obj[self.key] = value
+
+
+class Signature(dict):
+    """Class that wraps the arguments and execution options
+    for a single task invocation.
+
+    Used as the parts in a :class:`group` or to safely
+    pass tasks around as callbacks.
+
+    :param task: Either a task class/instance, or the name of a task.
+    :keyword args: Positional arguments to apply.
+    :keyword kwargs: Keyword arguments to apply.
+    :keyword options: Additional options to :meth:`Task.apply_async`.
+
+    Note that if the first argument is a :class:`dict`, the other
+    arguments will be ignored and the values in the dict will be used
+    instead.
+
+        >>> s = subtask("tasks.add", args=(2, 2))
+        >>> subtask(s)
+        {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
+
+    """
+    TYPES = {}
+    _type = None
+
+    @classmethod
+    def register_type(cls, subclass, name=None):
+        cls.TYPES[name or subclass.__name__] = subclass
+        return subclass
+
+    @classmethod
+    def from_dict(self, d):
+        typ = d.get("subtask_type")
+        if typ:
+            return self.TYPES[typ].from_dict(d)
+        return Signature(d)
+
+    def __init__(self, task=None, args=None, kwargs=None, options=None,
+                type=None, subtask_type=None, **ex):
+        init = dict.__init__
+
+        if isinstance(task, dict):
+            return init(self, task)  # works like dict(d)
+
+        # Also supports using task class/instance instead of string name.
+        try:
+            task_name = task.name
+        except AttributeError:
+            task_name = task
+        else:
+            self._type = task
+
+        init(self, task=task_name, args=tuple(args or ()),
+                                   kwargs=kwargs or {},
+                                   options=dict(options or {}, **ex),
+                                   subtask_type=subtask_type)
+
+    def delay(self, *argmerge, **kwmerge):
+        """Shortcut to `apply_async(argmerge, kwargs)`."""
+        return self.apply_async(args=argmerge, kwargs=kwmerge)
+
+    def apply(self, args=(), kwargs={}, **options):
+        """Apply this task locally."""
+        # For callbacks: extra args are prepended to the stored args.
+        args, kwargs, options = self._merge(args, kwargs, options)
+        return self.type.apply(args, kwargs, **options)
+
+    def _merge(self, args=(), kwargs={}, options={}):
+        return (tuple(args) + tuple(self.args),
+                dict(self.kwargs, **kwargs),
+                dict(self.options, **options))
+
+    def clone(self, args=(), kwargs={}, **options):
+        args, kwargs, options = self._merge(args, kwargs, options)
+        s = self.from_dict({"task": self.task, "args": args,
+                            "kwargs": kwargs, "options": options,
+                            "subtask_type": self.subtask_type})
+        s._type = self._type
+        return s
+    partial = clone
+
+    def replace(self, args=None, kwargs=None, options=None):
+        s = self.clone()
+        if args is not None:
+            s.args = args
+        if kwargs is not None:
+            s.kwargs = kwargs
+        if options is not None:
+            s.options = options
+        return s
+
+    def set(self, **options):
+        self.options.update(options)
+        return self
+
+    def apply_async(self, args=(), kwargs={}, **options):
+        """Apply this task asynchronously."""
+        # For callbacks: extra args are prepended to the stored args.
+        args, kwargs, options = self._merge(args, kwargs, options)
+        return self.type.apply_async(args, kwargs, **options)
+
+    def link(self, callback):
+        """Add a callback task to be applied if this task
+        executes successfully."""
+        linked = self.options.setdefault("link", [])
+        if callback not in linked:
+            linked.append(callback)
+        return callback
+
+    def link_error(self, errback):
+        """Add a callback task to be applied if an error occurs
+        while executing this task."""
+        linked = self.options.setdefault("link_error", [])
+        if errback not in linked:
+            linked.append(errback)
+        return errback
+
+    def flatten_links(self):
+        """Gives a recursive list of dependencies (unchain if you will,
+        but with links intact)."""
+        return list(chain_from_iterable(_chain([[self]],
+                (link.flatten_links()
+                    for link in maybe_list(self.options.get("link")) or []))))
+
+    def __or__(self, other):
+        if isinstance(other, chain):
+            return chain(self.tasks + other.tasks)
+        elif isinstance(other, Signature):
+            return chain(self, other)
+        return NotImplementedError
+
+    def __invert__(self):
+        return self.apply_async().get()
+
+    def __reduce__(self):
+        # for serialization, the task type is lazily loaded,
+        # and not stored in the dict itself.
+        return subtask, (dict(self), )
+
+    def reprcall(self, *args, **kwargs):
+        args, kwargs, _ = self._merge(args, kwargs, {})
+        return reprcall(self["task"], args, kwargs)
+
+    def __repr__(self):
+        return self.reprcall()
+
+    @cached_property
+    def type(self):
+        return self._type or current_app.tasks[self.task]
+    task = _getitem_property("task")
+    args = _getitem_property("args")
+    kwargs = _getitem_property("kwargs")
+    options = _getitem_property("options")
+    subtask_type = _getitem_property("subtask_type")
+
+
+class chain(Signature):
+
+    def __init__(self, *tasks, **options):
+        Signature.__init__(self, "celery.chain", (), {"tasks": tasks}, options)
+        self.tasks = tasks
+        self.subtask_type = "chain"
+
+    @classmethod
+    def from_dict(self, d):
+        return chain(*d["kwargs"]["tasks"], **kwdict(d["options"]))
+
+    def __repr__(self):
+        return " | ".join(map(repr, self.tasks))
+Signature.register_type(chain)
+
+
+class group(Signature):
+
+    def __init__(self, tasks, **options):
+        self.tasks = tasks = [maybe_subtask(t) for t in tasks]
+        Signature.__init__(self, "celery.group", (), {"tasks": tasks}, options)
+        self.subtask_type = "group"
+
+    @classmethod
+    def from_dict(self, d):
+        return group(d["kwargs"]["tasks"], **kwdict(d["options"]))
+
+    def __call__(self, **options):
+        tasks, result = self.type.prepare(options,
+                                map(Signature.clone, self.tasks))
+        return self.type(tasks, result)
+
+    def __repr__(self):
+        return repr(self.tasks)
+Signature.register_type(group)
+
+
+class chord(Signature):
+    Chord = Chord
+
+    def __init__(self, header, body=None, **options):
+        Signature.__init__(self, "celery.chord", (),
+                         {"header": list(header),
+                          "body": maybe_subtask(body)}, options)
+        self.subtask_type = "chord"
+
+    @classmethod
+    def from_dict(self, d):
+        kwargs = d["kwargs"]
+        return chord(kwargs["header"], kwargs["body"], **kwdict(d["options"]))
+
+    def __call__(self, body=None, **options):
+        _chord = self.Chord
+        self.kwargs["body"] = body or self.kwargs["body"]
+        if _chord.app.conf.CELERY_ALWAYS_EAGER:
+            return _chord.apply((), self.kwargs)
+        callback_id = body.options.setdefault("task_id", uuid())
+        _chord(**self.kwargs)
+        return _chord.AsyncResult(callback_id)
+
+    def clone(self, *args, **kwargs):
+        s = Signature.clone(self, *args, **kwargs)
+        # need make copy of body
+        try:
+            kwargs["body"] = kwargs["body"].clone()
+        except KeyError:
+            pass
+        return s
+
+    def link(self, callback):
+        self.body.link(callback)
+        return callback
+
+    def link_error(self, errback):
+        self.body.link_error(errback)
+        return errback
+
+    def __repr__(self):
+        if self.body:
+            return self.body.reprcall(self.tasks)
+        return "<chord without body: %r>" % (self.tasks, )
+
+    @property
+    def tasks(self):
+        return self.kwargs["header"]
+
+    @property
+    def body(self):
+        return self.kwargs["body"]
+Signature.register_type(chord)
+
+
+def subtask(varies, *args, **kwargs):
+    if not (args or kwargs) and isinstance(varies, dict):
+        if isinstance(varies, Signature):
+            return varies.clone()
+        return Signature.from_dict(varies)
+    return Signature(varies, *args, **kwargs)
+
+
+def maybe_subtask(d):
+    return subtask(d) if d is not None and not isinstance(d, Signature) else d

+ 4 - 4
celery/result.py

@@ -127,7 +127,7 @@ class AsyncResult(ResultBase):
 
 
             @task
             @task
             def A(how_many):
             def A(how_many):
-                return TaskSet(B.subtask((i, )) for i in xrange(how_many))
+                return TaskSet(B.s(i) for i in xrange(how_many))
 
 
             @task
             @task
             def B(i):
             def B(i):
@@ -493,9 +493,9 @@ class ResultSet(ResultBase):
                 remaining = timeout - (time.time() - time_start)
                 remaining = timeout - (time.time() - time_start)
                 if remaining <= 0.0:
                 if remaining <= 0.0:
                     raise TimeoutError("join operation timed out")
                     raise TimeoutError("join operation timed out")
-            results.append(result.wait(timeout=remaining,
-                                       propagate=propagate,
-                                       interval=interval))
+            results.append(result.get(timeout=remaining,
+                                      propagate=propagate,
+                                      interval=interval))
         return results
         return results
 
 
     def iter_native(self, timeout=None, interval=None):
     def iter_native(self, timeout=None, interval=None):

+ 2 - 2
celery/task/__init__.py

@@ -26,8 +26,8 @@ old_module, new_module = recreate_module(__name__,
     by_module={
     by_module={
         "celery.task.base":   ["BaseTask", "Task", "PeriodicTask",
         "celery.task.base":   ["BaseTask", "Task", "PeriodicTask",
                                "task", "periodic_task"],
                                "task", "periodic_task"],
-        "celery.task.sets":   ["chain", "group", "TaskSet", "subtask"],
-        "celery.task.chords": ["chord"],
+        "celery.canvas":      ["chain", "group", "chord", "subtask"],
+        "celery.task.sets":   ["TaskSet"],
     },
     },
     base=module,
     base=module,
     __package__="celery.task",
     __package__="celery.task",

+ 2 - 37
celery/task/chords.py

@@ -1,37 +1,2 @@
-# -*- coding: utf-8 -*-
-"""
-    celery.task.chords
-    ~~~~~~~~~~~~~~~~~~
-
-    Chords (task set callbacks).
-
-    :copyright: (c) 2009 - 2012 by Ask Solem.
-    :license: BSD, see LICENSE for more details.
-
-"""
-from __future__ import absolute_import
-
-from celery import current_app
-from celery.local import Proxy
-from celery.task.sets import subtask
-from celery.utils import uuid
-
-Chord = Proxy(lambda: current_app.tasks["celery.chord"])
-
-
-class chord(object):
-    Chord = None
-
-    def __init__(self, tasks, **options):
-        self.tasks = tasks
-        self.options = options
-        self.Chord = self.Chord or current_app.tasks["celery.chord"]
-
-    def __call__(self, body, **options):
-        tid = body.options.setdefault("task_id", uuid())
-        result = self.Chord.apply_async((list(self.tasks), body),
-                                        self.options, **options)
-
-        if self.Chord.app.conf.CELERY_ALWAYS_EAGER:
-            return subtask(body).apply(args=(result.result.join(),))
-        return body.type.AsyncResult(tid)
+# compat module
+from celery.canvas import Chord, chord  # noqa

+ 12 - 153
celery/task/sets.py

@@ -1,154 +1,14 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
-"""
-    celery.task.sets
-    ~~~~~~~~~~~~~~~~
-
-    Creating and applying groups of tasks.
-
-    :copyright: (c) 2009 - 2012 by Ask Solem.
-    :license: BSD, see LICENSE for more details.
-
-"""
 from __future__ import absolute_import
 from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
-from itertools import chain as _chain
-
-from kombu.utils import reprcall
-
-from celery import current_app
-from celery.app import current_task
-from celery.datastructures import AttributeDict
-from celery.utils import cached_property, uuid
-from celery.utils.functional import maybe_list
-from celery.utils.compat import UserList, chain_from_iterable
-
+from celery.app.state import get_current_task
+from celery.canvas import subtask, maybe_subtask  # noqa
+from celery.utils import uuid
+from celery.utils.compat import UserList
 
 
-class subtask(AttributeDict):
-    """Class that wraps the arguments and execution options
-    for a single task invocation.
 
 
-    Used as the parts in a :class:`group` or to safely
-    pass tasks around as callbacks.
-
-    :param task: Either a task class/instance, or the name of a task.
-    :keyword args: Positional arguments to apply.
-    :keyword kwargs: Keyword arguments to apply.
-    :keyword options: Additional options to :meth:`Task.apply_async`.
-
-    Note that if the first argument is a :class:`dict`, the other
-    arguments will be ignored and the values in the dict will be used
-    instead.
-
-        >>> s = subtask("tasks.add", args=(2, 2))
-        >>> subtask(s)
-        {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
-
-    """
-    _type = None
-
-    def __init__(self, task=None, args=None, kwargs=None, options=None,
-                type=None, **ex):
-        init = AttributeDict.__init__
-
-        if isinstance(task, dict):
-            return init(self, task)  # works like dict(d)
-
-        # Also supports using task class/instance instead of string name.
-        try:
-            task_name = task.name
-        except AttributeError:
-            task_name = task
-        else:
-            # need to use super here, since AttributeDict
-            # will add it to dict(self)
-            object.__setattr__(self, "_type", task)
-
-        init(self, task=task_name, args=tuple(args or ()),
-                                   kwargs=dict(kwargs or {}, **ex),
-                                   options=options or {})
-
-    def delay(self, *argmerge, **kwmerge):
-        """Shortcut to `apply_async(argmerge, kwargs)`."""
-        return self.apply_async(args=argmerge, kwargs=kwmerge)
-
-    def apply(self, args=(), kwargs={}, **options):
-        """Apply this task locally."""
-        # For callbacks: extra args are prepended to the stored args.
-        args = tuple(args) + tuple(self.args)
-        kwargs = dict(self.kwargs, **kwargs)
-        options = dict(self.options, **options)
-        return self.type.apply(args, kwargs, **options)
-
-    def clone(self, args=(), kwargs={}, **options):
-        return self.__class__(self.task,
-                              args=tuple(args) + tuple(self.args),
-                              kwargs=dict(self.kwargs, **kwargs),
-                              options=dict(self.options, **options))
-
-    def apply_async(self, args=(), kwargs={}, **options):
-        """Apply this task asynchronously."""
-        # For callbacks: extra args are prepended to the stored args.
-        args = tuple(args) + tuple(self.args)
-        kwargs = dict(self.kwargs, **kwargs)
-        options = dict(self.options, **options)
-        return self.type.apply_async(args, kwargs, **options)
-
-    def link(self, callback):
-        """Add a callback task to be applied if this task
-        executes successfully."""
-        self.options.setdefault("link", []).append(callback)
-        return callback
-
-    def link_error(self, errback):
-        """Add a callback task to be applied if an error occurs
-        while executing this task."""
-        self.options.setdefault("link_error", []).append(errback)
-        return errback
-
-    def flatten_links(self):
-        """Gives a recursive list of dependencies (unchain if you will,
-        but with links intact)."""
-        return list(chain_from_iterable(_chain([[self]],
-                (link.flatten_links()
-                    for link in maybe_list(self.options.get("link")) or []))))
-
-    def __reduce__(self):
-        # for serialization, the task type is lazily loaded,
-        # and not stored in the dict itself.
-        return (self.__class__, (dict(self), ), None)
-
-    def __repr__(self):
-        return reprcall(self["task"], self["args"], self["kwargs"])
-
-    @cached_property
-    def type(self):
-        return self._type or current_app.tasks[self.task]
-
-
-def maybe_subtask(t):
-    if not isinstance(t, subtask):
-        return subtask(t)
-    return t
-
-
-class chain(object):
-
-    def __init__(self, *tasks):
-        self.tasks = tasks
-
-    def apply_async(self, **kwargs):
-        tasks = [task.clone(task_id=uuid(), **kwargs)
-                    for task in self.tasks]
-        reduce(lambda a, b: a.link(b), tasks)
-        tasks[0].apply_async()
-        results = [task.type.AsyncResult(task.options["task_id"])
-                        for task in tasks]
-        reduce(lambda a, b: a.set_parent(b), reversed(results))
-        return results[-1]
-
-
-class group(UserList):
+class TaskSet(UserList):
     """A task containing several subtasks, making it possible
     """A task containing several subtasks, making it possible
     to track how many, or when all of the tasks have been completed.
     to track how many, or when all of the tasks have been completed.
 
 
@@ -157,9 +17,9 @@ class group(UserList):
     Example::
     Example::
 
 
         >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
         >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
-        >>> g = group(refresh_feed.subtask((url, )) for url in urls)
-        >>> group_result = g.apply_async()
-        >>> list_of_return_values = group_result.join()  # *expensive*
+        >>> s = TaskSet(refresh_feed.s(url) for url in urls)
+        >>> taskset_result = s.apply_async()
+        >>> list_of_return_values = taskset_result.join()  # *expensive*
 
 
     """
     """
     _app = None
     _app = None
@@ -171,7 +31,7 @@ class group(UserList):
 
 
     def apply_async(self, connection=None, connect_timeout=None,
     def apply_async(self, connection=None, connect_timeout=None,
             publisher=None, taskset_id=None):
             publisher=None, taskset_id=None):
-        """Apply group."""
+        """Apply TaskSet."""
         app = self.app
         app = self.app
 
 
         if app.conf.CELERY_ALWAYS_EAGER:
         if app.conf.CELERY_ALWAYS_EAGER:
@@ -187,7 +47,7 @@ class group(UserList):
                     pub.close()
                     pub.close()
 
 
             result = app.TaskSetResult(setid, results)
             result = app.TaskSetResult(setid, results)
-            parent = current_task()
+            parent = get_current_task()
             if parent:
             if parent:
                 parent.request.children.append(result)
                 parent.request.children.append(result)
             return result
             return result
@@ -197,7 +57,7 @@ class group(UserList):
                 for task in self.tasks]
                 for task in self.tasks]
 
 
     def apply(self, taskset_id=None):
     def apply(self, taskset_id=None):
-        """Applies the group locally by blocking until all tasks return."""
+        """Applies the TaskSet locally by blocking until all tasks return."""
         setid = taskset_id or uuid()
         setid = taskset_id or uuid()
         return self.app.TaskSetResult(setid, self._sync_results(setid))
         return self.app.TaskSetResult(setid, self._sync_results(setid))
 
 
@@ -206,7 +66,7 @@ class group(UserList):
 
 
     @property
     @property
     def total(self):
     def total(self):
-        """Number of subtasks in this group."""
+        """Number of subtasks in this TaskSet."""
         return len(self)
         return len(self)
 
 
     def _get_app(self):
     def _get_app(self):
@@ -229,4 +89,3 @@ class group(UserList):
     def _set_Publisher(self, Publisher):
     def _set_Publisher(self, Publisher):
         self._Publisher = Publisher
         self._Publisher = Publisher
     Publisher = property(_get_Publisher, _set_Publisher)
     Publisher = property(_get_Publisher, _set_Publisher)
-TaskSet = group

+ 2 - 2
celery/task/trace.py

@@ -159,8 +159,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
     clear_request = task_request.clear
     clear_request = task_request.clear
     on_chord_part_return = backend.on_chord_part_return
     on_chord_part_return = backend.on_chord_part_return
 
 
-    from celery.task import sets
-    subtask = sets.subtask
+    from celery import canvas
+    subtask = canvas.subtask
 
 
     def trace_task(uuid, args, kwargs, request=None):
     def trace_task(uuid, args, kwargs, request=None):
         R = I = None
         R = I = None

+ 1 - 2
celery/tests/functional/tasks.py

@@ -2,8 +2,7 @@ from __future__ import absolute_import
 
 
 import time
 import time
 
 
-from celery.task import task
-from celery.task.sets import subtask
+from celery import task, subtask
 
 
 
 
 @task
 @task

+ 1 - 1
celery/tests/test_app/test_loaders.py

@@ -241,5 +241,5 @@ class test_AppLoader(Case):
     def test_on_worker_init(self):
     def test_on_worker_init(self):
         self.loader.conf["CELERY_IMPORTS"] = ("subprocess", )
         self.loader.conf["CELERY_IMPORTS"] = ("subprocess", )
         sys.modules.pop("subprocess", None)
         sys.modules.pop("subprocess", None)
-        self.loader.on_worker_init()
+        self.loader.init_worker()
         self.assertIn("subprocess", sys.modules)
         self.assertIn("subprocess", sys.modules)

+ 11 - 11
celery/tests/test_task/__init__.py

@@ -304,7 +304,7 @@ class TestCeleryTasks(Case):
 
 
     def test_after_return(self):
     def test_after_return(self):
         task = self.createTask("c.unittest.t.after_return")
         task = self.createTask("c.unittest.t.after_return")
-        task.request.chord = return_True_task.subtask()
+        task.request.chord = return_True_task.s()
         task.after_return("SUCCESS", 1.0, "foobar", (), {}, None)
         task.after_return("SUCCESS", 1.0, "foobar", (), {}, None)
         task.request.clear()
         task.request.clear()
 
 
@@ -387,7 +387,7 @@ class TestTaskSet(Case):
 
 
     @with_eager_tasks
     @with_eager_tasks
     def test_function_taskset(self):
     def test_function_taskset(self):
-        subtasks = [return_True_task.subtask([i]) for i in range(1, 6)]
+        subtasks = [return_True_task.s(i) for i in range(1, 6)]
         ts = task.TaskSet(subtasks)
         ts = task.TaskSet(subtasks)
         res = ts.apply_async()
         res = ts.apply_async()
         self.assertListEqual(res.join(), [True, True, True, True, True])
         self.assertListEqual(res.join(), [True, True, True, True, True])
@@ -395,15 +395,15 @@ class TestTaskSet(Case):
     def test_counter_taskset(self):
     def test_counter_taskset(self):
         increment_counter.count = 0
         increment_counter.count = 0
         ts = task.TaskSet(tasks=[
         ts = task.TaskSet(tasks=[
-            increment_counter.subtask((), {}),
-            increment_counter.subtask((), {"increment_by": 2}),
-            increment_counter.subtask((), {"increment_by": 3}),
-            increment_counter.subtask((), {"increment_by": 4}),
-            increment_counter.subtask((), {"increment_by": 5}),
-            increment_counter.subtask((), {"increment_by": 6}),
-            increment_counter.subtask((), {"increment_by": 7}),
-            increment_counter.subtask((), {"increment_by": 8}),
-            increment_counter.subtask((), {"increment_by": 9}),
+            increment_counter.s(),
+            increment_counter.s(increment_by=2),
+            increment_counter.s(increment_by=3),
+            increment_counter.s(increment_by=4),
+            increment_counter.s(increment_by=5),
+            increment_counter.s(increment_by=6),
+            increment_counter.s(increment_by=7),
+            increment_counter.s(increment_by=8),
+            increment_counter.s(increment_by=9),
         ])
         ])
         self.assertEqual(ts.total, 9)
         self.assertEqual(ts.total, 9)
 
 

+ 39 - 13
celery/tests/test_task/test_chord.py

@@ -4,12 +4,11 @@ from __future__ import with_statement
 from mock import patch
 from mock import patch
 from contextlib import contextmanager
 from contextlib import contextmanager
 
 
+from celery import canvas
 from celery import current_app
 from celery import current_app
 from celery import result
 from celery import result
 from celery.result import AsyncResult, TaskSetResult
 from celery.result import AsyncResult, TaskSetResult
-from celery.task import chords
 from celery.task import task, TaskSet
 from celery.task import task, TaskSet
-from celery.task import sets
 from celery.tests.utils import AppCase, Mock
 from celery.tests.utils import AppCase, Mock
 
 
 passthru = lambda x: x
 passthru = lambda x: x
@@ -63,14 +62,15 @@ class test_unlock_chord_task(AppCase):
 
 
         pts, result.TaskSetResult = result.TaskSetResult, AlwaysReady
         pts, result.TaskSetResult = result.TaskSetResult, AlwaysReady
         callback.apply_async = Mock()
         callback.apply_async = Mock()
+        callback_s = callback.s()
         try:
         try:
             with patch_unlock_retry() as (unlock, retry):
             with patch_unlock_retry() as (unlock, retry):
-                subtask, sets.subtask = sets.subtask, passthru
+                subtask, canvas.maybe_subtask = canvas.maybe_subtask, passthru
                 try:
                 try:
-                    unlock("setid", callback,
+                    unlock("setid", callback_s,
                            result=map(AsyncResult, [1, 2, 3]))
                            result=map(AsyncResult, [1, 2, 3]))
                 finally:
                 finally:
-                    sets.subtask = subtask
+                    canvas.maybe_subtask = subtask
                 callback.apply_async.assert_called_with(([2, 4, 8, 6], ), {})
                 callback.apply_async.assert_called_with(([2, 4, 8, 6], ), {})
                 # did not retry
                 # did not retry
                 self.assertFalse(retry.call_count)
                 self.assertFalse(retry.call_count)
@@ -101,16 +101,42 @@ class test_unlock_chord_task(AppCase):
 
 
 class test_chord(AppCase):
 class test_chord(AppCase):
 
 
-    def test_apply(self):
+    def test_eager(self):
+        from celery import chord
+
+        @task
+        def addX(x, y):
+            return x + y
 
 
-        class chord(chords.chord):
-            Chord = Mock()
+        @task
+        def sumX(n):
+            return sum(n)
 
 
-        x = chord(add.subtask((i, i)) for i in xrange(10))
-        body = add.subtask((2, ))
-        result = x(body)
-        self.assertEqual(result.id, body.options["task_id"])
-        self.assertTrue(chord.Chord.apply_async.call_count)
+        self.app.conf.CELERY_ALWAYS_EAGER = True
+        try:
+            x = chord(addX.s(i, i) for i in xrange(10))
+            body = sumX.s()
+            result = x(body)
+            self.assertEqual(result.get(), sum(i + i for i in xrange(10)))
+        finally:
+            self.app.conf.CELERY_ALWAYS_EAGER = False
+
+    def test_apply(self):
+        self.app.conf.CELERY_ALWAYS_EAGER = False
+        from celery import chord
+
+        m = Mock()
+        m.app.conf.CELERY_ALWAYS_EAGER = False
+        m.AsyncResult = AsyncResult
+        prev, chord.Chord = chord.Chord, m
+        try:
+            x = chord(add.s(i, i) for i in xrange(10))
+            body = add.s(2)
+            result = x(body)
+            self.assertEqual(result.id, body.options["task_id"])
+            self.assertTrue(chord.Chord.called)
+        finally:
+            chord.Chord = prev
 
 
 
 
 class test_Chord_task(AppCase):
 class test_Chord_task(AppCase):

+ 4 - 3
celery/tests/test_task/test_task_sets.py

@@ -6,6 +6,7 @@ import anyjson
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.task import Task
 from celery.task import Task
 from celery.task.sets import subtask, TaskSet
 from celery.task.sets import subtask, TaskSet
+from celery.canvas import Signature
 
 
 from celery.tests.utils import Case
 from celery.tests.utils import Case
 
 
@@ -92,7 +93,7 @@ class test_subtask(Case):
 
 
     def test_reduce(self):
     def test_reduce(self):
         s = MockTask.subtask((2, ), {"cache": True})
         s = MockTask.subtask((2, ), {"cache": True})
-        cls, args, _ = s.__reduce__()
+        cls, args = s.__reduce__()
         self.assertDictEqual(dict(cls(*args)), dict(s))
         self.assertDictEqual(dict(cls(*args)), dict(s))
 
 
 
 
@@ -125,7 +126,7 @@ class test_TaskSet(Case):
 
 
         applied = [0]
         applied = [0]
 
 
-        class mocksubtask(subtask):
+        class mocksubtask(Signature):
 
 
             def apply_async(self, *args, **kwargs):
             def apply_async(self, *args, **kwargs):
                 applied[0] += 1
                 applied[0] += 1
@@ -146,7 +147,7 @@ class test_TaskSet(Case):
 
 
         applied = [0]
         applied = [0]
 
 
-        class mocksubtask(subtask):
+        class mocksubtask(Signature):
 
 
             def apply(self, *args, **kwargs):
             def apply(self, *args, **kwargs):
                 applied[0] += 1
                 applied[0] += 1

+ 2 - 2
docs/_ext/celerydocs.py

@@ -22,13 +22,13 @@ APPATTRS = {
 }
 }
 
 
 ABBRS = {
 ABBRS = {
-    "Celery": "celery.app.Celery",
+    "Celery": "celery.Celery",
 }
 }
 
 
 ABBR_EMPTY = {
 ABBR_EMPTY = {
     "exc": "celery.exceptions",
     "exc": "celery.exceptions",
 }
 }
-DEFAULT_EMPTY = "celery.app.Celery"
+DEFAULT_EMPTY = "celery.Celery"
 
 
 
 
 def typeify(S, type):
 def typeify(S, type):

+ 11 - 39
docs/reference/celery.app.rst

@@ -5,50 +5,22 @@
     .. contents::
     .. contents::
         :local:
         :local:
 
 
-    Application
-    -----------
+    Proxies
+    -------
 
 
-    .. autoclass:: Celery
+    .. autodata:: default_app
 
 
-        .. attribute:: main
-
-            Name of the `__main__` module.  Required for standalone scripts.
-
-            If set this will be used instead of `__main__` when automatically
-            generating task names.
-
-        .. autoattribute:: amqp
-        .. autoattribute:: backend
-        .. autoattribute:: loader
-        .. autoattribute:: conf
-        .. autoattribute:: control
-        .. autoattribute:: log
-
-        .. automethod:: config_from_object
-        .. automethod:: config_from_envvar
-        .. automethod:: config_from_cmdline
-
-        .. automethod:: task
-        .. automethod:: create_task_cls
-        .. automethod:: send_task
-        .. autoattribute:: AsyncResult
-        .. autoattribute:: TaskSetResult
-
-        .. automethod:: worker_main
-        .. autoattribute:: Worker
-        .. autoattribute:: Beat
-
-        .. automethod:: broker_connection
-        .. automethod:: with_default_connection
-
-        .. automethod:: mail_admins
-
-        .. automethod:: prepare_config
-
-        .. automethod:: either
 
 
     Functions
     Functions
     ---------
     ---------
 
 
     .. autofunction:: app_or_default
     .. autofunction:: app_or_default
+    .. autofunction:: enable_trace
+    .. autofunction:: disable_trace
+
+
+    Data
+    ----
+
+    .. autodata:: default_loader
 
 

+ 85 - 0
docs/reference/celery.rst

@@ -0,0 +1,85 @@
+.. currentmodule:: celery
+
+.. automodule:: celery
+
+    .. contents::
+        :local:
+
+    Application
+    -----------
+
+    .. autoclass:: Celery
+
+        .. attribute:: main
+
+            Name of the `__main__` module.  Required for standalone scripts.
+
+            If set this will be used instead of `__main__` when automatically
+            generating task names.
+
+        .. autoattribute:: conf
+        .. autoattribute:: amqp
+        .. autoattribute:: backend
+        .. autoattribute:: loader
+        .. autoattribute:: control
+        .. autoattribute:: events
+        .. autoattribute:: log
+        .. autoattribute:: tasks
+        .. autoattribute:: pool
+        .. autoattribute:: Task
+
+        .. automethod:: bugreport
+
+        .. automethod:: config_from_object
+        .. automethod:: config_from_envvar
+        .. automethod:: config_from_cmdline
+
+        .. automethod:: start
+
+        .. automethod:: task
+        .. automethod:: send_task
+        .. autoattribute:: AsyncResult
+        .. autoattribute:: TaskSetResult
+
+        .. automethod:: worker_main
+        .. autoattribute:: Worker
+        .. autoattribute:: WorkController
+        .. autoattribute:: Beat
+
+        .. automethod:: broker_connection
+        .. automethod:: default_connection
+
+        .. automethod:: mail_admins
+
+        .. automethod:: prepare_config
+        .. automethod:: select_queues
+        .. automethod:: now
+
+        .. automethod:: set_current
+        .. automethod:: finalize
+
+        .. autoattribute:: Pickler
+
+    Grouping Tasks
+    --------------
+
+    .. autofunction:: group
+
+    .. autofunction:: chain
+
+    .. autofunction:: chord
+
+    .. autofunction:: subtask
+
+    Proxies
+    -------
+
+    .. data:: current_app
+
+        The currently set app for this thread.
+
+    .. data:: current_task
+
+        The task currently being executed
+        (only set in the worker, or when eager/apply is used).
+

+ 1 - 0
docs/reference/index.rst

@@ -8,6 +8,7 @@
 .. toctree::
 .. toctree::
     :maxdepth: 1
     :maxdepth: 1
 
 
+    celery
     celery.app
     celery.app
     celery.app.task
     celery.app.task
     celery.app.amqp
     celery.app.amqp