| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 | .. _guide-sets:======================================= Sets of tasks, Subtasks and Callbacks=======================================.. contents::    :local:.. _sets-subtasks:Subtasks========.. versionadded:: 2.0The :class:`~celery.task.sets.subtask` type is used to wrap the arguments andexecution options for a single task invocation::    subtask(task_name_or_cls, args, kwargs, options)For convenience every task also has a shortcut to create subtasks::    task.subtask(args, kwargs, options):class:`~celery.task.sets.subtask` is actually a :class:`dict` subclass,which means it can be serialized with JSON or other encodings that doesn'tsupport complex Python objects.Also it can be regarded as a type, as the following usage works::    >>> s = subtask("tasks.add", args=(2, 2), kwargs={})    >>> subtask(dict(s))  # coerce dict into subtaskThis makes it excellent as a means to pass callbacks around to tasks... _sets-callbacks:Callbacks---------Let's improve our `add` task so it can accept a callback thattakes the result as an argument::    from celery.task import task    from celery.task.sets import subtask    @task    def add(x, y, callback=None):        result = x + y        if callback is not None:            subtask(callback).delay(result)        return result:class:`~celery.task.sets.subtask` also knows how it should be applied,asynchronously by :meth:`~celery.task.sets.subtask.delay`, andeagerly by :meth:`~celery.task.sets.subtask.apply`.The best thing is that any arguments you add to `subtask.delay`,will be prepended to the arguments specified by the subtask itself!If you have the subtask::    >>> add.subtask(args=(10, ))`subtask.delay(result)` becomes::    >>> add.apply_async(args=(result, 10))...Now let's execute our new `add` task with a callback::    >>> add.delay(2, 2, callback=add.subtask((8, )))As expected this will first launch one task calculating :math:`2 + 2`, thenanother task calculating :math:`4 + 8`... _sets-taskset:Task Sets=========The :class:`~celery.task.sets.TaskSet` enables easy invocation of severaltasks at once, and is then able to join the results in the same order as thetasks were invoked.A task set takes a list of :class:`~celery.task.sets.subtask`'s::    >>> from celery.task.sets import TaskSet    >>> from tasks import add    >>> job = TaskSet(tasks=[    ...             add.subtask((4, 4)),    ...             add.subtask((8, 8)),    ...             add.subtask((16, 16)),    ...             add.subtask((32, 32)),    ... ])    >>> result = job.apply_async()    >>> result.ready()  # have all subtasks completed?    True    >>> result.successful() # were all subtasks successful?    True    >>> result.join()    [4, 8, 16, 32, 64].. _sets-results:Results-------When a  :class:`~celery.task.sets.TaskSet` is applied it returns a:class:`~celery.result.TaskSetResult` object.:class:`~celery.result.TaskSetResult` takes a list of:class:`~celery.result.AsyncResult` instances and operates on them as if it was asingle task.It supports the following operations:* :meth:`~celery.result.TaskSetResult.successful`    Returns :const:`True` if all of the subtasks finished    successfully (e.g. did not raise an exception).* :meth:`~celery.result.TaskSetResult.failed`    Returns :const:`True` if any of the subtasks failed.* :meth:`~celery.result.TaskSetResult.waiting`    Returns :const:`True` if any of the subtasks    is not ready yet.* :meth:`~celery.result.TaskSetResult.ready`    Return :const:`True` if all of the subtasks    are ready.* :meth:`~celery.result.TaskSetResult.completed_count`    Returns the number of completed subtasks.* :meth:`~celery.result.TaskSetResult.revoke`    Revokes all of the subtasks.* :meth:`~celery.result.TaskSetResult.iterate`    Iterates over the return values of the subtasks    as they finish, one by one.* :meth:`~celery.result.TaskSetResult.join`    Gather the results for all of the subtasks    and return a list with them ordered by the order of which they    were called.Task set callbacks------------------Simple, but may take a long time before your callback is called:.. code-block:: python    from celery import current_app    from celery.task import subtask    def join_taskset(setid, subtasks, callback, interval=15, max_retries=None):        result = TaskSetResult(setid, subtasks)        if result.ready():            return subtask(callback).delay(result.join())        join_taskset.retry(countdown=interval, max_retries=max_retries)Using Redis and atomic counters:.. code-block:: python    from celery import current_app    from celery.task import Task, TaskSet    from celery.result import TaskSetResult    from celery.utils import gen_unique_id, cached_property    from redis import Redis    from time import sleep    class supports_taskset_callback(Task):        abstract = True        accept_magic_kwargs = False        def after_return(self, \*args, \*\*kwargs):            if self.request.taskset:                callback = self.request.kwargs.get("callback")                if callback:                    setid = self.request.taskset                    # task set must be saved in advance, so the task doesn't                    # try to restore it before that happens.  This is why we                    # use the `apply_presaved_taskset` below.                    result = TaskSetResult.restore(setid)                    current = self.redis.incr("taskset-" + setid)                    if current >= result.total:                        r = subtask(callback).delay(result.join())        @cached_property        def redis(self):            return Redis(host="localhost", port=6379)    @task(base=supports_taskset_callback)    def add(x, y, \*\*kwargs):        return x + y    @task    def sum_of(numbers):        print("TASKSET READY: %r" % (sum(numbers), ))    def apply_presaved_taskset(tasks):        r = []        setid = gen_unique_id()        for task in tasks:            uuid = gen_unique_id()            task.options["task_id"] = uuid            r.append((task, current_app.AsyncResult(uuid)))        ts = current_app.TaskSetResult(setid, [task[1] for task in r])        ts.save()        return TaskSet(task[0] for task in r).apply_async(taskset_id=setid)    # sum of 100 add tasks    result = apply_presaved_taskset(                add.subtask((i, i), {"callback": sum_of.subtask()})                    for i in xrange(100))
 |