|
@@ -26,7 +26,9 @@ A :func:`~celery.signature` wraps the arguments, keyword arguments, and executio
|
|
|
of a single task invocation in a way such that it can be passed to functions
|
|
|
or even serialized and sent across the wire.
|
|
|
|
|
|
-- You can create a signature for the ``add`` task using its name like this::
|
|
|
+- You can create a signature for the ``add`` task using its name like this:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> from celery import signature
|
|
|
>>> signature('tasks.add', args=(2, 2), countdown=10)
|
|
@@ -35,22 +37,30 @@ or even serialized and sent across the wire.
|
|
|
This task has a signature of arity 2 (two arguments): ``(2, 2)``,
|
|
|
and sets the countdown execution option to 10.
|
|
|
|
|
|
-- or you can create one using the task's ``signature`` method::
|
|
|
+- or you can create one using the task's ``signature`` method:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add.signature((2, 2), countdown=10)
|
|
|
tasks.add(2, 2)
|
|
|
|
|
|
-- There is also a shortcut using star arguments::
|
|
|
+- There is also a shortcut using star arguments:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add.s(2, 2)
|
|
|
tasks.add(2, 2)
|
|
|
|
|
|
-- Keyword arguments are also supported::
|
|
|
+- Keyword arguments are also supported:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add.s(2, 2, debug=True)
|
|
|
tasks.add(2, 2, debug=True)
|
|
|
|
|
|
-- From any signature instance you can inspect the different fields::
|
|
|
+- From any signature instance you can inspect the different fields:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> s = add.signature((2, 2), {'debug': True}, countdown=10)
|
|
|
>>> s.args
|
|
@@ -63,20 +73,27 @@ or even serialized and sent across the wire.
|
|
|
- It supports the "Calling API" which means it supports ``delay`` and
|
|
|
``apply_async`` or being called directly.
|
|
|
|
|
|
- Calling the signature will execute the task inline in the current process::
|
|
|
+ Calling the signature will execute the task inline in the current process:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add(2, 2)
|
|
|
4
|
|
|
>>> add.s(2, 2)()
|
|
|
4
|
|
|
|
|
|
- ``delay`` is our beloved shortcut to ``apply_async`` taking star-arguments::
|
|
|
+ ``delay`` is our beloved shortcut to ``apply_async`` taking star-arguments:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> result = add.delay(2, 2)
|
|
|
>>> result.get()
|
|
|
4
|
|
|
|
|
|
- ``apply_async`` takes the same arguments as the :meth:`Task.apply_async <@Task.apply_async>` method::
|
|
|
+ ``apply_async`` takes the same arguments as the
|
|
|
+ :meth:`Task.apply_async <@Task.apply_async>` method:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add.apply_async(args, kwargs, **options)
|
|
|
>>> add.signature(args, kwargs, **options).apply_async()
|
|
@@ -85,20 +102,26 @@ or even serialized and sent across the wire.
|
|
|
>>> add.signature((2, 2), countdown=1).apply_async()
|
|
|
|
|
|
- You can't define options with :meth:`~@Task.s`, but a chaining
|
|
|
- ``set`` call takes care of that::
|
|
|
+ ``set`` call takes care of that:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
- >>> add.s(2, 2).set(countdown=1)
|
|
|
- proj.tasks.add(2, 2)
|
|
|
+ >>> add.s(2, 2).set(countdown=1)
|
|
|
+ proj.tasks.add(2, 2)
|
|
|
|
|
|
Partials
|
|
|
--------
|
|
|
|
|
|
-With a signature, you can execute the task in a worker::
|
|
|
+With a signature, you can execute the task in a worker:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.s(2, 2).delay()
|
|
|
>>> add.s(2, 2).apply_async(countdown=1)
|
|
|
|
|
|
-Or you can call it directly in the current process::
|
|
|
+Or you can call it directly in the current process:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.s(2, 2)()
|
|
|
4
|
|
@@ -106,27 +129,35 @@ Or you can call it directly in the current process::
|
|
|
Specifying additional args, kwargs or options to ``apply_async``/``delay``
|
|
|
creates partials:
|
|
|
|
|
|
-- Any arguments added will be prepended to the args in the signature::
|
|
|
+- Any arguments added will be prepended to the args in the signature:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
- >>> partial = add.s(2) # incomplete signature
|
|
|
- >>> partial.delay(4) # 4 + 2
|
|
|
- >>> partial.apply_async((4,)) # same
|
|
|
+ >>> partial = add.s(2) # incomplete signature
|
|
|
+ >>> partial.delay(4) # 4 + 2
|
|
|
+ >>> partial.apply_async((4,)) # same
|
|
|
|
|
|
- Any keyword arguments added will be merged with the kwargs in the signature,
|
|
|
- with the new keyword arguments taking precedence::
|
|
|
+ with the new keyword arguments taking precedence:
|
|
|
|
|
|
- >>> s = add.s(2, 2)
|
|
|
- >>> s.delay(debug=True) # -> add(2, 2, debug=True)
|
|
|
- >>> s.apply_async(kwargs={'debug': True}) # same
|
|
|
+ .. code-block:: pycon
|
|
|
+
|
|
|
+ >>> s = add.s(2, 2)
|
|
|
+ >>> s.delay(debug=True) # -> add(2, 2, debug=True)
|
|
|
+ >>> s.apply_async(kwargs={'debug': True}) # same
|
|
|
|
|
|
- Any options added will be merged with the options in the signature,
|
|
|
- with the new options taking precedence::
|
|
|
+ with the new options taking precedence:
|
|
|
|
|
|
- >>> s = add.signature((2, 2), countdown=10)
|
|
|
- >>> s.apply_async(countdown=1) # countdown is now 1
|
|
|
+ .. code-block:: pycon
|
|
|
+
|
|
|
+ >>> s = add.signature((2, 2), countdown=10)
|
|
|
+ >>> s.apply_async(countdown=1) # countdown is now 1
|
|
|
|
|
|
You can also clone signatures to create derivatives:
|
|
|
|
|
|
+.. code-block:: pycon
|
|
|
+
|
|
|
>>> s = add.s(2)
|
|
|
proj.tasks.add(2)
|
|
|
|
|
@@ -142,11 +173,15 @@ Partials are meant to be used with callbacks, any tasks linked or chord
|
|
|
callbacks will be applied with the result of the parent task.
|
|
|
Sometimes you want to specify a callback that does not take
|
|
|
additional arguments, and in that case you can set the signature
|
|
|
-to be immutable::
|
|
|
+to be immutable:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
|
|
|
|
|
|
-The ``.si()`` shortcut can also be used to create immutable signatures::
|
|
|
+The ``.si()`` shortcut can also be used to create immutable signatures:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.apply_async((2, 2), link=reset_buffers.si())
|
|
|
|
|
@@ -157,7 +192,9 @@ so it's not possible to call the signature with partial args/kwargs.
|
|
|
|
|
|
In this tutorial I sometimes use the prefix operator `~` to signatures.
|
|
|
You probably shouldn't use it in your production code, but it's a handy shortcut
|
|
|
- when experimenting in the Python shell::
|
|
|
+ when experimenting in the Python shell:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> ~sig
|
|
|
|
|
@@ -173,7 +210,9 @@ Callbacks
|
|
|
.. versionadded:: 3.0
|
|
|
|
|
|
Callbacks can be added to any task using the ``link`` argument
|
|
|
-to ``apply_async``::
|
|
|
+to ``apply_async``:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
add.apply_async((2, 2), link=other_task.s())
|
|
|
|
|
@@ -183,18 +222,24 @@ and it will be applied with the return value of the parent task as argument.
|
|
|
As I mentioned earlier, any arguments you add to a signature,
|
|
|
will be prepended to the arguments specified by the signature itself!
|
|
|
|
|
|
-If you have the signature::
|
|
|
+If you have the signature:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> sig = add.s(10)
|
|
|
|
|
|
-then `sig.delay(result)` becomes::
|
|
|
+then `sig.delay(result)` becomes:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.apply_async(args=(result, 10))
|
|
|
|
|
|
...
|
|
|
|
|
|
Now let's call our ``add`` task with a callback using partial
|
|
|
-arguments::
|
|
|
+arguments:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.apply_async((2, 2), link=add.s(8))
|
|
|
|
|
@@ -230,7 +275,9 @@ The Primitives
|
|
|
a temporary task where a list of arguments is applied to the task.
|
|
|
E.g. ``task.map([1, 2])`` results in a single task
|
|
|
being called, applying the arguments in order to the task function so
|
|
|
- that the result is::
|
|
|
+ that the result is:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
|
|
|
res = [task(1), task(2)]
|
|
|
|
|
@@ -238,13 +285,17 @@ The Primitives
|
|
|
|
|
|
Works exactly like map except the arguments are applied as ``*args``.
|
|
|
For example ``add.starmap([(2, 2), (4, 4)])`` results in a single
|
|
|
- task calling::
|
|
|
+ task calling:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
|
|
|
res = [add(2, 2), add(4, 4)]
|
|
|
|
|
|
- ``chunks``
|
|
|
|
|
|
- Chunking splits a long list of arguments into parts, e.g the operation::
|
|
|
+ Chunking splits a long list of arguments into parts, e.g the operation:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> items = zip(xrange(1000), xrange(1000)) # 1000 items
|
|
|
>>> add.chunks(items, 10)
|
|
@@ -263,16 +314,18 @@ Here's some examples:
|
|
|
Here's a simple chain, the first task executes passing its return value
|
|
|
to the next task in the chain, and so on.
|
|
|
|
|
|
- .. code-block:: python
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> from celery import chain
|
|
|
|
|
|
- # 2 + 2 + 4 + 8
|
|
|
+ >>> # 2 + 2 + 4 + 8
|
|
|
>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
|
|
|
>>> res.get()
|
|
|
16
|
|
|
|
|
|
- This can also be written using pipes::
|
|
|
+ This can also be written using pipes:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
|
|
|
16
|
|
@@ -284,15 +337,21 @@ Here's some examples:
|
|
|
for example if you don't want the result of the previous task in a chain.
|
|
|
|
|
|
In that case you can mark the signature as immutable, so that the arguments
|
|
|
- cannot be changed::
|
|
|
+ cannot be changed:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add.signature((2, 2), immutable=True)
|
|
|
|
|
|
- There's also an ``.si`` shortcut for this::
|
|
|
+ There's also an ``.si`` shortcut for this:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> add.si(2, 2)
|
|
|
|
|
|
- Now you can create a chain of independent tasks instead::
|
|
|
+ Now you can create a chain of independent tasks instead:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> res = (add.si(2, 2) | add.si(4, 4) | add.s(8, 8))()
|
|
|
>>> res.get()
|
|
@@ -306,7 +365,9 @@ Here's some examples:
|
|
|
|
|
|
- Simple group
|
|
|
|
|
|
- You can easily create a group of tasks to execute in parallel::
|
|
|
+ You can easily create a group of tasks to execute in parallel:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> from celery import group
|
|
|
>>> res = group(add.s(i, i) for i in xrange(10))()
|
|
@@ -317,7 +378,9 @@ Here's some examples:
|
|
|
|
|
|
The chord primitive enables us to add callback to be called when
|
|
|
all of the tasks in a group have finished executing, which is often
|
|
|
- required for algorithms that aren't embarrassingly parallel::
|
|
|
+ required for algorithms that aren't embarrassingly parallel:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> from celery import chord
|
|
|
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
|
|
@@ -329,7 +392,9 @@ Here's some examples:
|
|
|
into a list and sent to the ``xsum`` task.
|
|
|
|
|
|
The body of a chord can also be immutable, so that the return value
|
|
|
- of the group is not passed on to the callback::
|
|
|
+ of the group is not passed on to the callback:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> chord((import_contact.s(c) for c in contacts),
|
|
|
... notify_complete.si(import_id)).apply_async()
|
|
@@ -338,7 +403,9 @@ Here's some examples:
|
|
|
|
|
|
- Blow your mind by combining
|
|
|
|
|
|
- Chains can be partial too::
|
|
|
+ Chains can be partial too:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> c1 = (add.s(4) | mul.s(8))
|
|
|
|
|
@@ -347,7 +414,9 @@ Here's some examples:
|
|
|
>>> res.get()
|
|
|
160
|
|
|
|
|
|
- Which means that you can combine chains::
|
|
|
+ Which means that you can combine chains:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
# ((4 + 16) * 2 + 4) * 8
|
|
|
>>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
|
|
@@ -357,7 +426,9 @@ Here's some examples:
|
|
|
352
|
|
|
|
|
|
Chaining a group together with another task will automatically
|
|
|
- upgrade it to be a chord::
|
|
|
+ upgrade it to be a chord:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> c3 = (group(add.s(i, i) for i in xrange(10)) | xsum.s())
|
|
|
>>> res = c3()
|
|
@@ -365,7 +436,9 @@ Here's some examples:
|
|
|
90
|
|
|
|
|
|
Groups and chords accepts partial arguments too, so in a chain
|
|
|
- the return value of the previous task is forwarded to all tasks in the group::
|
|
|
+ the return value of the previous task is forwarded to all tasks in the group:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
|
|
|
>>> new_user_workflow = (create_user.s() | group(
|
|
@@ -378,7 +451,9 @@ Here's some examples:
|
|
|
|
|
|
|
|
|
If you don't want to forward arguments to the group then
|
|
|
- you can make the signatures in the group immutable::
|
|
|
+ you can make the signatures in the group immutable:
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
|
|
|
>>> res = (add.s(4, 4) | group(add.si(i, i) for i in xrange(10)))()
|
|
|
>>> res.get()
|
|
@@ -406,7 +481,9 @@ Chains
|
|
|
.. versionadded:: 3.0
|
|
|
|
|
|
Tasks can be linked together, which in practice means adding
|
|
|
-a callback task::
|
|
|
+a callback task:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> res = add.apply_async((2, 2), link=mul.s(16))
|
|
|
>>> res.get()
|
|
@@ -417,7 +494,9 @@ task as the first argument, which in the above case will result
|
|
|
in ``mul(4, 16)`` since the result is 4.
|
|
|
|
|
|
The results will keep track of any subtasks called by the original task,
|
|
|
-and this can be accessed from the result instance::
|
|
|
+and this can be accessed from the result instance:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> res.children
|
|
|
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
|
|
@@ -427,7 +506,9 @@ and this can be accessed from the result instance::
|
|
|
|
|
|
The result instance also has a :meth:`~@AsyncResult.collect` method
|
|
|
that treats the result as a graph, enabling you to iterate over
|
|
|
-the results::
|
|
|
+the results:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> list(res.collect())
|
|
|
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
|
|
@@ -437,19 +518,25 @@ By default :meth:`~@AsyncResult.collect` will raise an
|
|
|
:exc:`~@IncompleteStream` exception if the graph is not fully
|
|
|
formed (one of the tasks has not completed yet),
|
|
|
but you can get an intermediate representation of the graph
|
|
|
-too::
|
|
|
+too:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> for result, value in res.collect(intermediate=True)):
|
|
|
....
|
|
|
|
|
|
You can link together as many tasks as you like,
|
|
|
-and signatures can be linked too::
|
|
|
+and signatures can be linked too:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> s = add.s(2, 2)
|
|
|
>>> s.link(mul.s(4))
|
|
|
>>> s.link(log_result.s())
|
|
|
|
|
|
-You can also add *error callbacks* using the ``link_error`` argument::
|
|
|
+You can also add *error callbacks* using the ``link_error`` argument:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.apply_async((2, 2), link_error=log_error.s())
|
|
|
|
|
@@ -476,25 +563,29 @@ To make it even easier to link tasks together there is
|
|
|
a special signature called :class:`~celery.chain` that lets
|
|
|
you chain tasks together:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> from celery import chain
|
|
|
>>> from proj.tasks import add, mul
|
|
|
|
|
|
- # (4 + 4) * 8 * 10
|
|
|
+ >>> # (4 + 4) * 8 * 10
|
|
|
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
|
|
|
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
|
|
|
|
|
|
|
|
|
Calling the chain will call the tasks in the current process
|
|
|
-and return the result of the last task in the chain::
|
|
|
+and return the result of the last task in the chain:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
|
|
|
>>> res.get()
|
|
|
640
|
|
|
|
|
|
It also sets ``parent`` attributes so that you can
|
|
|
-work your way up the chain to get intermediate results::
|
|
|
+work your way up the chain to get intermediate results:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> res.parent.get()
|
|
|
64
|
|
@@ -506,7 +597,9 @@ work your way up the chain to get intermediate results::
|
|
|
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
|
|
|
|
|
|
|
|
|
-Chains can also be made using the ``|`` (pipe) operator::
|
|
|
+Chains can also be made using the ``|`` (pipe) operator:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
|
|
|
|
|
@@ -516,7 +609,7 @@ Graphs
|
|
|
In addition you can work with the result graph as a
|
|
|
:class:`~celery.datastructures.DependencyGraph`:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
|
|
|
|
|
@@ -527,7 +620,9 @@ In addition you can work with the result graph as a
|
|
|
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
|
|
|
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
|
|
|
|
|
|
-You can even convert these graphs to *dot* format::
|
|
|
+You can even convert these graphs to *dot* format:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> with open('graph.dot', 'w') as fh:
|
|
|
... res.parent.parent.graph.to_dot(fh)
|
|
@@ -535,7 +630,7 @@ You can even convert these graphs to *dot* format::
|
|
|
|
|
|
and create images:
|
|
|
|
|
|
-.. code-block:: bash
|
|
|
+.. code-block:: console
|
|
|
|
|
|
$ dot -Tpng graph.dot -o graph.png
|
|
|
|
|
@@ -550,7 +645,9 @@ Groups
|
|
|
|
|
|
A group can be used to execute several tasks in parallel.
|
|
|
|
|
|
-The :class:`~celery.group` function takes a list of signatures::
|
|
|
+The :class:`~celery.group` function takes a list of signatures:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> from celery import group
|
|
|
>>> from proj.tasks import add
|
|
@@ -561,14 +658,18 @@ The :class:`~celery.group` function takes a list of signatures::
|
|
|
If you **call** the group, the tasks will be applied
|
|
|
one after one in the current process, and a :class:`~celery.result.GroupResult`
|
|
|
instance is returned which can be used to keep track of the results,
|
|
|
-or tell how many tasks are ready and so on::
|
|
|
+or tell how many tasks are ready and so on:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> g = group(add.s(2, 2), add.s(4, 4))
|
|
|
>>> res = g()
|
|
|
>>> res.get()
|
|
|
[4, 8]
|
|
|
|
|
|
-Group also supports iterators::
|
|
|
+Group also supports iterators:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> group(add.s(i, i) for i in xrange(100))()
|
|
|
|
|
@@ -580,7 +681,9 @@ Group Results
|
|
|
|
|
|
The group task returns a special result too,
|
|
|
this result works just like normal task results, except
|
|
|
-that it works on the group as a whole::
|
|
|
+that it works on the group as a whole:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> from celery import group
|
|
|
>>> from tasks import add
|
|
@@ -653,7 +756,7 @@ Chords
|
|
|
Tasks used within a chord must *not* ignore their results. If the result
|
|
|
backend is disabled for *any* task (header or body) in your chord you
|
|
|
should read ":ref:`chord-important-notes`".
|
|
|
-
|
|
|
+
|
|
|
|
|
|
A chord is a task that only executes after all of the tasks in a group have
|
|
|
finished executing.
|
|
@@ -677,7 +780,9 @@ already a standard function):
|
|
|
|
|
|
|
|
|
Now you can use a chord to calculate each addition step in parallel, and then
|
|
|
-get the sum of the resulting numbers::
|
|
|
+get the sum of the resulting numbers:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> from celery import chord
|
|
|
>>> from tasks import add, tsum
|
|
@@ -688,9 +793,11 @@ get the sum of the resulting numbers::
|
|
|
|
|
|
|
|
|
This is obviously a very contrived example, the overhead of messaging and
|
|
|
-synchronization makes this a lot slower than its Python counterpart::
|
|
|
+synchronization makes this a lot slower than its Python counterpart:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
- sum(i + i for i in xrange(100))
|
|
|
+ >>> sum(i + i for i in xrange(100))
|
|
|
|
|
|
The synchronization step is costly, so you should avoid using chords as much
|
|
|
as possible. Still, the chord is a powerful primitive to have in your toolbox
|
|
@@ -698,7 +805,7 @@ as synchronization is a required step for many parallel algorithms.
|
|
|
|
|
|
Let's break the chord expression down:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> callback = tsum.s()
|
|
|
>>> header = [add.s(i, i) for i in range(100)]
|
|
@@ -725,11 +832,14 @@ Errors will propagate to the callback, so the callback will not be executed
|
|
|
instead the callback changes to failure state, and the error is set
|
|
|
to the :exc:`~@ChordError` exception:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
|
|
|
>>> result = c()
|
|
|
>>> result.get()
|
|
|
+
|
|
|
+.. code-block:: pytb
|
|
|
+
|
|
|
Traceback (most recent call last):
|
|
|
File "<stdin>", line 1, in <module>
|
|
|
File "*/celery/result.py", line 120, in get
|
|
@@ -833,7 +943,7 @@ They differ from group in that
|
|
|
|
|
|
For example using ``map``:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> from proj.tasks import add
|
|
|
|
|
@@ -848,7 +958,9 @@ is the same as having a task doing:
|
|
|
def temp():
|
|
|
return [xsum(range(10)), xsum(range(100))]
|
|
|
|
|
|
-and using ``starmap``::
|
|
|
+and using ``starmap``:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> ~add.starmap(zip(range(10), range(10)))
|
|
|
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
|
|
@@ -863,7 +975,9 @@ is the same as having a task doing:
|
|
|
|
|
|
Both ``map`` and ``starmap`` are signature objects, so they can be used as
|
|
|
other signatures and combined in groups etc., for example
|
|
|
-to call the starmap after 10 seconds::
|
|
|
+to call the starmap after 10 seconds:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
|
|
|
|
|
@@ -883,14 +997,14 @@ it may considerably increase performance.
|
|
|
|
|
|
To create a chunks signature you can use :meth:`@Task.chunks`:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.chunks(zip(range(100), range(100)), 10)
|
|
|
|
|
|
As with :class:`~celery.group` the act of sending the messages for
|
|
|
the chunks will happen in the current process when called:
|
|
|
|
|
|
-.. code-block:: python
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> from proj.tasks import add
|
|
|
|
|
@@ -909,16 +1023,22 @@ the chunks will happen in the current process when called:
|
|
|
|
|
|
while calling ``.apply_async`` will create a dedicated
|
|
|
task so that the individual tasks are applied in a worker
|
|
|
-instead::
|
|
|
+instead:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> add.chunks(zip(range(100), range(100)), 10).apply_async()
|
|
|
|
|
|
-You can also convert chunks to a group::
|
|
|
+You can also convert chunks to a group:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> group = add.chunks(zip(range(100), range(100)), 10).group()
|
|
|
|
|
|
and with the group skew the countdown of each task by increments
|
|
|
-of one::
|
|
|
+of one:
|
|
|
+
|
|
|
+.. code-block:: pycon
|
|
|
|
|
|
>>> group.skew(start=1, stop=10)()
|
|
|
|