123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- .. _guide-sets:
- =======================================
- Sets of tasks, Subtasks and Callbacks
- =======================================
- .. contents::
- :local:
- .. _sets-subtasks:
- Subtasks
- ========
- .. versionadded:: 2.0
- The :class:`~celery.task.sets.subtask` type is used to wrap the arguments and
- execution options for a single task invocation::
- subtask(task_name_or_cls, args, kwargs, options)
- For convenience every task also has a shortcut to create subtasks::
- task.subtask(args, kwargs, options)
- :class:`~celery.task.sets.subtask` is actually a :class:`dict` subclass,
- which means it can be serialized with JSON or other encodings that doesn't
- support complex Python objects.
- Also it can be regarded as a type, as the following usage works::
- >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
- >>> subtask(dict(s)) # coerce dict into subtask
- This makes it excellent as a means to pass callbacks around to tasks.
- .. _sets-callbacks:
- Callbacks
- ---------
- Let's improve our `add` task so it can accept a callback that
- takes the result as an argument::
- from celery.decorators import task
- from celery.task.sets import subtask
- @task
- def add(x, y, callback=None):
- result = x + y
- if callback is not None:
- subtask(callback).delay(result)
- return result
- :class:`~celery.task.sets.subtask` also knows how it should be applied,
- asynchronously by :meth:`~celery.task.sets.subtask.delay`, and
- eagerly by :meth:`~celery.task.sets.subtask.apply`.
- The best thing is that any arguments you add to `subtask.delay`,
- will be prepended to the arguments specified by the subtask itself!
- If you have the subtask::
- >>> add.subtask(args=(10, ))
- `subtask.delay(result)` becomes::
- >>> add.apply_async(args=(result, 10))
- ...
- Now let's execute our new `add` task with a callback::
- >>> add.delay(2, 2, callback=add.subtask((8, )))
- As expected this will first launch one task calculating `2 + 2`, then
- another task calculating `4 + 8`.
- .. _sets-taskset:
- Task Sets
- =========
- The :class:`~celery.task.sets.TaskSet` enables easy invocation of several
- tasks at once, and is then able to join the results in the same order as the
- tasks were invoked.
- A task set takes a list of :class:`~celery.task.sets.subtask`'s::
- >>> from celery.task.sets import TaskSet
- >>> from tasks import add
- >>> job = TaskSet(tasks=[
- ... add.subtask((4, 4)),
- ... add.subtask((8, 8)),
- ... add.subtask((16, 16)),
- ... add.subtask((32, 32)),
- ... ])
- >>> result = job.apply_async()
- >>> result.ready() # has all subtasks completed?
- True
- >>> result.successful() # was all subtasks successful?
- >>> result.join()
- [4, 8, 16, 32, 64]
- .. _sets-results:
- Results
- -------
- When a :class:`~celery.task.sets.TaskSet` is applied it returns a
- :class:`~celery.result.TaskSetResult` object.
- :class:`~celery.result.TaskSetResult` takes a list of
- :class:`~celery.result.AsyncResult` instances and operates on them as if it was a
- single task.
- It supports the following operations:
- * :meth:`~celery.result.TaskSetResult.successful`
- Returns :const:`True` if all of the subtasks finished
- successfully (e.g. did not raise an exception).
- * :meth:`~celery.result.TaskSetResult.failed`
- Returns :const:`True` if any of the subtasks failed.
- * :meth:`~celery.result.TaskSetResult.waiting`
- Returns :const:`True` if any of the subtasks
- is not ready yet.
- * :meth:`~celery.result.TaskSetResult.ready`
- Return :const:`True` if all of the subtasks
- are ready.
- * :meth:`~celery.result.TaskSetResult.completed_count`
- Returns the number of completed subtasks.
- * :meth:`~celery.result.TaskSetResult.revoke`
- Revokes all of the subtasks.
- * :meth:`~celery.result.TaskSetResult.iterate`
- Iterates over the return values of the subtasks
- as they finish, one by one.
- * :meth:`~celery.result.TaskSetResult.join`
- Gather the results for all of the subtasks
- and return a list with them ordered by the order of which they
- were called.
|