浏览代码

Added User Guide Section: Sets of tasks, Subtasks and Callbacks.

Ask Solem 15 年之前
父节点
当前提交
d10a68390f
共有 3 个文件被更改,包括 151 次插入3 次删除
  1. 3 3
      celery/task/sets.py
  2. 1 0
      docs/userguide/index.rst
  3. 147 0
      docs/userguide/tasksets.rst

+ 3 - 3
celery/task/sets.py

@@ -86,7 +86,7 @@ class TaskSet(UserList):
         >>> urls = ("http://cnn.com/rss",
         ...         "http://bbc.co.uk/rss",
         ...         "http://xkcd.com/rss")
-        >>> subtasks = [subtask(RefreshFeedTask, kwargs={"feed_url": url})
+        >>> subtasks = [RefreshFeedTask.subtask(kwargs={"feed_url": url})
         ...                 for url in urls]
         >>> taskset = TaskSet(tasks=subtasks)
         >>> taskset_result = taskset.apply_async()
@@ -118,8 +118,8 @@ class TaskSet(UserList):
         Example
 
             >>> ts = TaskSet(tasks=(
-            ...         subtask(RefreshFeedTask, ["http://foo.com/rss"]),
-            ...         subtask(RefreshFeedTask, ["http://bar.com/rss"]),
+            ...         RefreshFeedTask.subtask(["http://foo.com/rss"]),
+            ...         RefreshFeedTask.subtask(["http://bar.com/rss"]),
             ... ))
             >>> result = ts.apply_async()
             >>> result.taskset_id

+ 1 - 0
docs/userguide/index.rst

@@ -11,5 +11,6 @@
     tasks
     executing
     workers
+    tasksets
     remote-tasks
     routing

+ 147 - 0
docs/userguide/tasksets.rst

@@ -0,0 +1,147 @@
+=======================================
+ Sets of tasks, Subtasks and Callbacks
+=======================================
+
+Subtasks
+========
+
+The :class:`~celery.task.sets.subtask` class is used to wrap the arguments and
+execution options for a task invocation. The signature is the following::
+
+    subtask(task_name_or_cls, args, kwargs, options)
+
+For convenience every task also has a shortcut to create subtask instances::
+
+    task.subtask(args, kwargs, options)
+
+:class:`~celery.task.sets.subtask` is actually a subclass of :class:`dict`,
+which means it can be serialized with JSON or other encodings that doesn't
+support complex Python objects.
+
+Also it can be regarded as a type, as the following usage works::
+
+    >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
+
+    >>> subtask(dict(s))  # coerce dict into subtask
+
+This makes it excellent as a means to pass callbacks around to tasks.
+
+Let's improve our ``add`` task so it can accept a callback that
+takes the result as an argument::
+
+    from celery.decorators import task
+    from celery.task.sets import subtask
+
+    @task
+    def add(x, y, callback=None):
+        result = x + y
+        if callback is not None:
+            subtask(callback).apply_async(result)
+        return result
+
+See? :class:`~celery.task.sets.subtask` also knows how it should be applied,
+asynchronously by :meth:`~celery.task.sets.subtask.apply_async`, and
+eagerly by :meth:`~celery.task.sets.subtask.apply`.
+
+The best thing is that any arguments you add to ``subtask.apply_async``,
+will be prepended to the arguments specified by the subtask itself
+Bbut please note: additional keyword arguments will be added to the
+execution options, not the task keyword arguments.
+
+So if you have the subtask::
+
+    >>> add.subtask(args=(10, ), options={"ignore_result": True})
+
+``subtask.apply_async(result)`` becomes::
+
+    >>> add.apply_async(args=(result, 10), ignore_result=True)
+
+and ``subtask.apply_async(result, ignore_result=False)`` becomes::
+
+    >>> add.apply_async(args=(result, 10), ignore_result=False)
+
+
+Now let's execute our new ``add`` task with a callback::
+
+    >>> add.delay(2, 2, callback=add.subtask((8, )))
+
+As expected this will first execute ``2 + 2``, then ``4 + 8``.
+
+TaskSets
+=========
+
+The :class:`~celery.task.sets.TaskSet` enables easy invocation of several
+tasks at once, and is then able to join the results in the same order as the
+tasks were invoked.
+
+The task set works on a list of :class:`~celery.task.sets.subtask`'s::
+
+    >>> from celery.task.sets import TaskSet
+    >>> from tasks import add
+
+    >>> job = TaskSet(tasks=[
+    ...             add.subtask((4, 4)),
+    ...             add.subtask((8, 8)),
+    ...             add.subtask((16, 16)),
+    ...             add.subtask((32, 32)),
+    ... ])
+
+    >>> result = job.apply_async()
+
+    >>> result.ready()  # has all subtasks completed?
+    True
+    >>> result.successful() # was all subtasks successful?
+
+    >>> result.join()
+    [4, 8, 16, 32, 64]
+
+
+TaskSet Results
+===============
+
+When a  :class:`~celery.task.sets.TaskSet` is applied it returns a
+:class:`~celery.result.TaskSetResult` object.
+
+:class:`~celery.result.TaskSetResult` takes a list of
+:class:`~celery.result.AsyncResult` instances and operates on them as if was a
+single task.
+
+The following operations are available:
+
+* :meth:`~celery.result.TaskSetResult.successful`
+
+    Returns :const:`True` if all of the subtasks finished
+    successfully (e.g. did not raise an exception).
+
+* :meth:`~celery.result.TaskSetResult.failed`
+
+    Returns :const:`True` if any of the subtasks failed.
+
+* :meth:`~celery.result.TaskSetResult.waiting`
+
+    Returns :const:`True` if any of the subtasks
+    is not ready.
+
+* meth:`~celery.result.TaskSetResult.ready`
+
+    Return :const:`True` if all of the subtasks
+    are ready.
+
+* meth:`~celery.result.TaskSetResult.completed_count`
+
+    Returns the number of completed subtasks.
+
+* meth:`~celery.result.TaskSetResult.revoke`
+
+    Revoke all of the subtasks.
+
+* meth:`~celery.result.TaskSetResult.iterate`
+
+    Iterate over the return values of the subtasks
+    as they finish, one by one.
+
+* meth:`~celery.result.TaskSetResult.join`
+
+  Gather the results for all of the subtasks,
+  and return a list with them ordered by the order of which they
+  were called.