123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- .. _guide-sets:
- =======================================
- Sets of tasks, Subtasks and Callbacks
- =======================================
- .. contents::
- :local:
- .. _sets-subtasks:
- Subtasks
- ========
- .. versionadded:: 2.0
- The :class:`~celery.task.sets.subtask` type is used to wrap the arguments and
- execution options for a single task invocation::
- subtask(task_name_or_cls, args, kwargs, options)
- For convenience every task also has a shortcut to create subtasks::
- task.subtask(args, kwargs, options)
- :class:`~celery.task.sets.subtask` is actually a :class:`dict` subclass,
- which means it can be serialized with JSON or other encodings that doesn't
- support complex Python objects.
- Also it can be regarded as a type, as the following usage works::
- >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
- >>> subtask(dict(s)) # coerce dict into subtask
- This makes it excellent as a means to pass callbacks around to tasks.
- .. _sets-callbacks:
- Callbacks
- ---------
- Let's improve our `add` task so it can accept a callback that
- takes the result as an argument::
- from celery.task import task
- from celery.task.sets import subtask
- @task
- def add(x, y, callback=None):
- result = x + y
- if callback is not None:
- subtask(callback).delay(result)
- return result
- :class:`~celery.task.sets.subtask` also knows how it should be applied,
- asynchronously by :meth:`~celery.task.sets.subtask.delay`, and
- eagerly by :meth:`~celery.task.sets.subtask.apply`.
- The best thing is that any arguments you add to `subtask.delay`,
- will be prepended to the arguments specified by the subtask itself!
- If you have the subtask::
- >>> add.subtask(args=(10, ))
- `subtask.delay(result)` becomes::
- >>> add.apply_async(args=(result, 10))
- ...
- Now let's execute our new `add` task with a callback::
- >>> add.delay(2, 2, callback=add.subtask((8, )))
- As expected this will first launch one task calculating :math:`2 + 2`, then
- another task calculating :math:`4 + 8`.
- .. _sets-taskset:
- Task Sets
- =========
- The :class:`~celery.task.sets.TaskSet` enables easy invocation of several
- tasks at once, and is then able to join the results in the same order as the
- tasks were invoked.
- A task set takes a list of :class:`~celery.task.sets.subtask`'s::
- >>> from celery.task.sets import TaskSet
- >>> from tasks import add
- >>> job = TaskSet(tasks=[
- ... add.subtask((4, 4)),
- ... add.subtask((8, 8)),
- ... add.subtask((16, 16)),
- ... add.subtask((32, 32)),
- ... ])
- >>> result = job.apply_async()
- >>> result.ready() # have all subtasks completed?
- True
- >>> result.successful() # were all subtasks successful?
- True
- >>> result.join()
- [4, 8, 16, 32, 64]
- .. _sets-results:
- Results
- -------
- When a :class:`~celery.task.sets.TaskSet` is applied it returns a
- :class:`~celery.result.TaskSetResult` object.
- :class:`~celery.result.TaskSetResult` takes a list of
- :class:`~celery.result.AsyncResult` instances and operates on them as if it was a
- single task.
- It supports the following operations:
- * :meth:`~celery.result.TaskSetResult.successful`
- Returns :const:`True` if all of the subtasks finished
- successfully (e.g. did not raise an exception).
- * :meth:`~celery.result.TaskSetResult.failed`
- Returns :const:`True` if any of the subtasks failed.
- * :meth:`~celery.result.TaskSetResult.waiting`
- Returns :const:`True` if any of the subtasks
- is not ready yet.
- * :meth:`~celery.result.TaskSetResult.ready`
- Return :const:`True` if all of the subtasks
- are ready.
- * :meth:`~celery.result.TaskSetResult.completed_count`
- Returns the number of completed subtasks.
- * :meth:`~celery.result.TaskSetResult.revoke`
- Revokes all of the subtasks.
- * :meth:`~celery.result.TaskSetResult.iterate`
- Iterates over the return values of the subtasks
- as they finish, one by one.
- * :meth:`~celery.result.TaskSetResult.join`
- Gather the results for all of the subtasks
- and return a list with them ordered by the order of which they
- were called.
- .. _chords:
- Chords
- ======
- A chord is a task that only executes after all of the tasks in a taskset has
- finished executing.
- Let's calculate the sum of the expression
- :math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits.
- First we need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is
- already a standard function):
- .. code-block:: python
- from celery.task import task
- @task
- def add(x, y):
- return x + y
- @task
- def tsum(numbers):
- return sum(numbers)
- Now we can use a chord to calculate each addition step in parallel, and then
- get the sum of the resulting numbers::
- >>> from celery.task import chord
- >>> from tasks import add, tsum
- >>> chord(add.subtask((i, i))
- ... for i in xrange(100))(tsum.subtask()).get()
- 9900
- This is obviously a very contrived example, the overhead of messaging and
- synchronization makes this a lot slower than its Python counterpart::
- sum(i + i for i in xrange(100))
- The synchronization step is costly, so you should avoid using chords as much
- as possible. Still, the chord is a powerful primitive to have in your toolbox
- as synchronization is a required step for many parallel algorithms.
- Let's break the chord expression down::
- >>> callback = tsum.subtask()
- >>> header = [add.subtask((i, i)) for i in xrange(100])
- >>> result = chord(header)(callback)
- >>> result.get()
- 9900
- Remember, the callback can only be executed after all of the tasks in the
- header has returned. Each step in the header is executed as a task, in
- parallel, possibly on different nodes. The callback is then applied with
- the return value of each task in the header. The task id returned by
- :meth:`chord` is the id of the callback, so you can wait for it to complete
- and get the final return value (but remember to :ref:`never have a task wait
- for other tasks <task-synchronous-subtasks>`)
- .. _chord-important-notes:
- Important Notes
- ---------------
- By default the synchronization step is implemented by having a recurring task
- poll the completion of the taskset every second, applying the subtask when
- ready.
- Example implementation:
- .. code-block:: python
- def unlock_chord(taskset, callback, interval=1, max_retries=None):
- if taskset.ready():
- return subtask(callback).delay(taskset.join())
- unlock_chord.retry(countdown=interval, max_retries=max_retries)
- This is used by all result backends except Redis, which increments a
- counter after each task in the header, then applying the callback when the
- counter exceeds the number of tasks in the set.
- The Redis approach is a much better solution, but not easily implemented
- in other backends (suggestions welcome!)
- .. note::
- If you are using chords with the Redis result backend and also overriding
- the :meth:`Task.after_return` method, you need to make sure to call the
- super method or else the chord callback will not be applied.
- .. code-block:: python
- def after_return(self, *args, **kwargs):
- do_something()
- super(MyTask, self).after_return(*args, **kwargs)
|