|
@@ -6,7 +6,7 @@
|
|
|
|
|
|
.. contents::
|
|
|
:local:
|
|
|
- :depth: 1
|
|
|
+ :depth: 2
|
|
|
|
|
|
.. _canvas-subtasks:
|
|
|
|
|
@@ -15,41 +15,60 @@ Subtasks
|
|
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
|
|
-A :func:`~celery.subtask` wraps the signature of a single task invocation:
|
|
|
-arguments, keyword arguments and execution options.
|
|
|
+We just learned how to call a task using the tasks ``delay`` method,
|
|
|
+and this is often all you need, but sometimes you may want to pass the
|
|
|
+signature of a task invocation to another process or as an argument to another
|
|
|
+function, for this Celery uses something called *subtasks*.
|
|
|
|
|
|
-A subtask for the ``add`` task can be created like this::
|
|
|
+A :func:`~celery.subtask` wraps the arguments, keyword arguments, and execution options
|
|
|
+of a single task invocation in a way such that it can be passed to functions
|
|
|
+or even serialized and sent across the wire.
|
|
|
|
|
|
- >>> from celery import subtask
|
|
|
- >>> subtask(add.name, args=(4, 4))
|
|
|
+- You can create a subtask for the ``add`` task using its name like this::
|
|
|
|
|
|
-or you can create one from the task itself::
|
|
|
+ >>> from celery import subtask
|
|
|
+ >>> subtask('tasks.add', args=(2, 2), countdown=10)
|
|
|
+ tasks.add(2, 2)
|
|
|
|
|
|
- >>> from proj.tasks import add
|
|
|
- >>> add.subtask(args=(4, 4))
|
|
|
+ This subtask has a signature of arity 2 (two arguments): ``(2, 2)``,
|
|
|
+ and sets the countdown execution option to 10.
|
|
|
|
|
|
-It supports the "Calling API" which means it takes the same arguments
|
|
|
-as the :meth:`~@Task.apply_async` method::
|
|
|
+- or you can create one using the task's ``subtask`` method::
|
|
|
|
|
|
- >>> add.apply_async(args, kwargs, **options)
|
|
|
- >>> add.subtask(args, kwargs, **options)
|
|
|
+ >>> add.subtask((2, 2), countdown=10)
|
|
|
+ tasks.add(2, 2)
|
|
|
|
|
|
- >>> add.apply_async((2, 2), countdown=1)
|
|
|
- >>> add.subtask((2, 2), countdown=1)
|
|
|
+- There is also a shortcut using star arguments::
|
|
|
|
|
|
-And like there is a :meth:`~@Task.delay` shortcut for `apply_async`
|
|
|
-there is an :meth:`~@Task.s` shortcut for subtask::
|
|
|
+ >>> add.s(2, 2)
|
|
|
+ tasks.add(2, 2)
|
|
|
|
|
|
- >>> add.s(*args, **kwargs)
|
|
|
+- Keyword arguments are also supported::
|
|
|
|
|
|
- >>> add.s(2, 2)
|
|
|
- proj.tasks.add(2, 2)
|
|
|
+ >>> add.s(2, 2, debug=True)
|
|
|
+ tasks.add(2, 2, debug=True)
|
|
|
|
|
|
- >>> add.s(2, 2) == add.subtask((2, 2))
|
|
|
- True
|
|
|
+- From any subtask instance we can inspect the different fields::
|
|
|
+
|
|
|
+ >>> s = add.subtask((2, 2), {'debug': True}, countdown=10)
|
|
|
+ >>> s.args
|
|
|
+ (2, 2)
|
|
|
+ >>> s.kwargs
|
|
|
+ {'debug': True}
|
|
|
+ >>> s.options
|
|
|
+ {'countdown': 10}
|
|
|
|
|
|
-You can't define options with :meth:`~@Task.s`, but a chaining
|
|
|
-``set`` call takes care of that::
|
|
|
+- It supports the "Calling API" which means it takes the same arguments
|
|
|
+ as the :meth:`~@Task.apply_async` method::
|
|
|
+
|
|
|
+ >>> add.apply_async(args, kwargs, **options)
|
|
|
+ >>> add.subtask(args, kwargs, **options).apply_async()
|
|
|
+
|
|
|
+ >>> add.apply_async((2, 2), countdown=1)
|
|
|
+ >>> add.subtask((2, 2), countdown=1).apply_async()
|
|
|
+
|
|
|
+- You can't define options with :meth:`~@Task.s`, but a chaining
|
|
|
+ ``set`` call takes care of that::
|
|
|
|
|
|
>>> add.s(2, 2).set(countdown=1)
|
|
|
proj.tasks.add(2, 2)
|
|
@@ -92,6 +111,11 @@ You can also clone subtasks to augment these::
|
|
|
>>> s.clone(args=(4, ), kwargs={'debug': True})
|
|
|
proj.tasks.add(2, 4, debug=True)
|
|
|
|
|
|
+Immutability
|
|
|
+------------
|
|
|
+
|
|
|
+.. versionadded:: 3.0
|
|
|
+
|
|
|
Partials are meant to be used with callbacks, any tasks linked or chord
|
|
|
callbacks will be applied with the result of the parent task.
|
|
|
Sometimes you want to specify a callback that does not take
|
|
@@ -109,9 +133,9 @@ so it's not possible to call the subtask with partial args/kwargs.
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
- In this tutorial we use the prefix operator `~` to subtasks.
|
|
|
+ In this tutorial we sometimes use the prefix operator `~` to subtasks.
|
|
|
You probably shouldn't use it in your production code, but it's a handy shortcut
|
|
|
- when testing with the Python shell::
|
|
|
+ when experimenting in the Python shell::
|
|
|
|
|
|
>>> ~subtask
|
|
|
|
|
@@ -124,6 +148,8 @@ so it's not possible to call the subtask with partial args/kwargs.
|
|
|
Callbacks
|
|
|
---------
|
|
|
|
|
|
+.. versionadded:: 3.0
|
|
|
+
|
|
|
Callbacks can be added to any task using the ``link`` argument
|
|
|
to ``apply_async``:
|
|
|
|
|
@@ -153,10 +179,203 @@ arguments::
|
|
|
As expected this will first launch one task calculating :math:`2 + 2`, then
|
|
|
another task calculating :math:`4 + 8`.
|
|
|
|
|
|
+The Primitives
|
|
|
+==============
|
|
|
+
|
|
|
+.. versionadded:: 3.0
|
|
|
+
|
|
|
+.. topic:: Overview
|
|
|
+
|
|
|
+ - ``group``
|
|
|
+
|
|
|
+ The group primitive is a subtask that takes a list of tasks that should
|
|
|
+ be applied in parallel.
|
|
|
+
|
|
|
+ - ``chain``
|
|
|
+
|
|
|
+ The chain primitive lets us link together subtasks so that one is called
|
|
|
+ after the other, essentially forming a *chain* of callbacks.
|
|
|
+
|
|
|
+ - ``chord``
|
|
|
+
|
|
|
+ A chord is just like a group but with a callback. A group consists
|
|
|
+ of a header group and a body, where the body is a task that should execute
|
|
|
+ after all of the tasks in the header is complete.
|
|
|
+
|
|
|
+ - ``map``
|
|
|
+
|
|
|
+ The map primitive works like the built-in ``map`` function, but creates
|
|
|
+ a temporary task where a list of arguments is applied to the task.
|
|
|
+ E.g. ``task.map([1, 2])`` results in a single task
|
|
|
+ being called, appyling the arguments in order to the task function so
|
|
|
+ that the result is::
|
|
|
+
|
|
|
+ res = [task(1), task(2)]
|
|
|
+
|
|
|
+ - ``starmap``
|
|
|
+
|
|
|
+ Works exactly like map except the arguments are applied as ``*args``.
|
|
|
+ For example ``add.starmap([(2, 2), (4, 4)])`` results in a single
|
|
|
+ task calling::
|
|
|
+
|
|
|
+ res = [add(2, 2), add(4, 4)]
|
|
|
+
|
|
|
+ - ``chunks``
|
|
|
+
|
|
|
+ Chunking splits a long list of arguments into parts, e.g the operation::
|
|
|
+
|
|
|
+ >>> add.chunks(zip(xrange(1000), xrange(1000), 10))
|
|
|
+
|
|
|
+ will create 10 tasks that apply 100 items each.
|
|
|
+
|
|
|
+
|
|
|
+The primitives are also subtasks themselves, so that they can be combined
|
|
|
+in any number of ways to compose complex workflows.
|
|
|
+
|
|
|
+Here's some examples::
|
|
|
+
|
|
|
+- Simple chain
|
|
|
+
|
|
|
+ Here's a simple chain, the first task executes passing its return value
|
|
|
+ to the next task in the chain, and so on.
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ # 2 + 2 + 4 + 8
|
|
|
+ >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
|
|
|
+ >>> res.get()
|
|
|
+ 16
|
|
|
+
|
|
|
+ This can also be written using pipes::
|
|
|
+
|
|
|
+ >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
|
|
|
+ 16
|
|
|
+
|
|
|
+- Immutable subtasks
|
|
|
+
|
|
|
+ As we have learned signatures can be partial, so that arguments can be
|
|
|
+ added to the existing arguments, but you may not always want that,
|
|
|
+ for example if you don't want the result of the previous task in a chain.
|
|
|
+
|
|
|
+ In that case you can mark the subtask as immutable, so that the arguments
|
|
|
+ cannot be changed::
|
|
|
+
|
|
|
+ >>> add.subtask((2, 2), immutable=True)
|
|
|
+
|
|
|
+ There's also an ``.si`` shortcut for this::
|
|
|
+
|
|
|
+ >>> add.si(2, 2)
|
|
|
+
|
|
|
+ Now we can create a chain of independent tasks instead::
|
|
|
+
|
|
|
+ >>> res = (add.si(2, 2), add.si(4, 4), add.s(8, 8))()
|
|
|
+ >>> res.get()
|
|
|
+ 16
|
|
|
+
|
|
|
+ >>> res.parent.get()
|
|
|
+ 8
|
|
|
+
|
|
|
+ >>> res.parent.parent.get()
|
|
|
+ 4
|
|
|
+
|
|
|
+- Simple group
|
|
|
+
|
|
|
+ We can easily create a group of tasks to execute in parallel::
|
|
|
+
|
|
|
+ >>> res = group(add.s(i, i) for i in xrange(10))()
|
|
|
+ >>> res.get(timeout=1)
|
|
|
+ [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
|
|
|
+
|
|
|
+ - For primitives `.apply_async` is special...
|
|
|
+
|
|
|
+ as it will create a temporary task to apply the tasks in,
|
|
|
+ for example by *applying the group*::
|
|
|
+
|
|
|
+ >>> g = group(add.s(i, i) for i in xrange(10))
|
|
|
+ >>> g() # << applying
|
|
|
+
|
|
|
+ the act of sending the messages for the tasks in the group
|
|
|
+ will happen in the current process,
|
|
|
+ but with ``.apply_async`` this happens in a temporary task
|
|
|
+ instead::
|
|
|
+
|
|
|
+ >>> g = group(add.s(i, i) for i in xrange(10))
|
|
|
+ >>> g.apply_async()
|
|
|
+
|
|
|
+ This is useful because we can e.g. specify a time for the
|
|
|
+ messages in the group to be called::
|
|
|
+
|
|
|
+ >>> g.apply_async(countdown=10)
|
|
|
+
|
|
|
+- Simple chord
|
|
|
+
|
|
|
+ The chord primitive enables us to add callback to be called when
|
|
|
+ all of the tasks in a group has finished executing, which is often
|
|
|
+ required for algorithms that aren't embarrassingly parallel::
|
|
|
+
|
|
|
+ >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
|
|
|
+ >>> res.get()
|
|
|
+ 90
|
|
|
+
|
|
|
+ The above example creates 10 task that all start in parallel,
|
|
|
+ and when all of them is complete the return values is combined
|
|
|
+ into a list and sent to the ``xsum`` task.
|
|
|
+
|
|
|
+ The body of a chord can also be immutable, so that the return value
|
|
|
+ of the group is not passed on to the callback::
|
|
|
+
|
|
|
+ >>> chord((import_contact.s(c) for c in contacts),
|
|
|
+ ... notify_complete.si(import_id)).apply_async()
|
|
|
+
|
|
|
+ Note the use of ``.si`` above which creates an immutable subtask.
|
|
|
+
|
|
|
+- Blow your mind by combining
|
|
|
+
|
|
|
+ Chains can be partial too::
|
|
|
+
|
|
|
+ >>> c1 = (add.s(4) | mul.s(8))
|
|
|
+
|
|
|
+ # (16 + 4) * 8
|
|
|
+ >>> res = c1(16)
|
|
|
+ >>> res.get()
|
|
|
+ 160
|
|
|
+
|
|
|
+ Which means that you can combine chains::
|
|
|
+
|
|
|
+ # ((4 + 16) * 2 + 4) * 8
|
|
|
+ >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
|
|
|
+
|
|
|
+ >>> res = c2()
|
|
|
+ >>> res.get()
|
|
|
+ 352
|
|
|
+
|
|
|
+ Chaining a group together with another task will automatically
|
|
|
+ upgrade it to be a chord::
|
|
|
+
|
|
|
+ >>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s()))
|
|
|
+ >>> res = c3()
|
|
|
+ >>> res.get()
|
|
|
+ 90
|
|
|
+
|
|
|
+ Groups and chords accepts partial arguments too, so in a chain
|
|
|
+ the return value of the previous task is forwarded to all tasks in the group::
|
|
|
+
|
|
|
+
|
|
|
+ >>> new_user_workflow = (create_user.s() | group(
|
|
|
+ ... import_contacts.s(),
|
|
|
+ ... send_welcome_email.s()))
|
|
|
+ ... new_user_workflow.delay(username='artv',
|
|
|
+ ... first='Art',
|
|
|
+ ... last='Vandelay',
|
|
|
+ ... email='art@vandelay.com')
|
|
|
+
|
|
|
+
|
|
|
.. _canvas-chains:
|
|
|
|
|
|
Chaining
|
|
|
-========
|
|
|
+--------
|
|
|
+
|
|
|
+.. versionadded:: 3.0
|
|
|
|
|
|
Tasks can be linked together, which in practice means adding
|
|
|
a callback task::
|
|
@@ -270,7 +489,7 @@ Chains can also be made using the ``|`` (pipe) operator::
|
|
|
>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
|
|
|
|
|
|
Graphs
|
|
|
-------
|
|
|
+~~~~~~
|
|
|
|
|
|
In addition you can work with the result graph as a
|
|
|
:class:`~celery.datastructures.DependencyGraph`:
|
|
@@ -301,7 +520,9 @@ and create images::
|
|
|
.. _canvas-group:
|
|
|
|
|
|
Groups
|
|
|
-======
|
|
|
+------
|
|
|
+
|
|
|
+.. versionadded:: 3.0
|
|
|
|
|
|
A group can be used to execute several tasks in parallel.
|
|
|
|
|
@@ -339,7 +560,7 @@ A group is a subclass instance, so it can be used in combination
|
|
|
with other subtasks.
|
|
|
|
|
|
Group Results
|
|
|
--------------
|
|
|
+~~~~~~~~~~~~~
|
|
|
|
|
|
The group task returns a special result too,
|
|
|
this result works just like normal task results, except
|
|
@@ -412,7 +633,7 @@ It supports the following operations:
|
|
|
.. _chords:
|
|
|
|
|
|
Chords
|
|
|
-======
|
|
|
+------
|
|
|
|
|
|
.. versionadded:: 2.3
|
|
|
|
|
@@ -476,7 +697,7 @@ for other tasks <task-synchronous-subtasks>`)
|
|
|
.. _chord-important-notes:
|
|
|
|
|
|
Important Notes
|
|
|
----------------
|
|
|
+~~~~~~~~~~~~~~~
|
|
|
|
|
|
By default the synchronization step is implemented by having a recurring task
|
|
|
poll the completion of the taskset every second, calling the subtask when
|
|
@@ -515,7 +736,7 @@ implemented in other backends (suggestions welcome!).
|
|
|
super(MyTask, self).after_return(*args, **kwargs)
|
|
|
|
|
|
Map & Starmap
|
|
|
-=============
|
|
|
+-------------
|
|
|
|
|
|
:class:`~celery.map` and :class:`~celery.starmap` are built-in tasks
|
|
|
that calls the task for every element in a sequence.
|
|
@@ -565,7 +786,7 @@ to call the starmap after 10 seconds::
|
|
|
.. _chunking:
|
|
|
|
|
|
Chunking
|
|
|
-========
|
|
|
+--------
|
|
|
|
|
|
-- Chunking lets you divide a iterable of work into pieces,
|
|
|
so that if you have one million objects, you can create
|
|
@@ -619,5 +840,3 @@ of one::
|
|
|
|
|
|
which means that the first task will have a countdown of 1, the second
|
|
|
a countdown of 2 and so on.
|
|
|
-
|
|
|
-
|