|
@@ -294,26 +294,11 @@ There is also a shortcut using star arguments::
|
|
|
>>> add.s(2, 2)
|
|
|
tasks.add(2, 2)
|
|
|
|
|
|
-and it also supports keyword arguments::
|
|
|
-
|
|
|
- >>> add.s(2, 2, debug=True)
|
|
|
- tasks.add(2, 2, debug=True)
|
|
|
-
|
|
|
-From any subtask instance we can inspect the different fields::
|
|
|
-
|
|
|
- >>> s = add.subtask((2, 2), {'debug': True}, countdown=10)
|
|
|
- >>> s.args
|
|
|
- (2, 2)
|
|
|
- >>> s.kwargs
|
|
|
- {'debug': True}
|
|
|
- >>> s.options
|
|
|
- {'countdown': 10}
|
|
|
-
|
|
|
And there's that calling API again...
|
|
|
-------------------------------------
|
|
|
|
|
|
-Subtask instances also supports the calling API, which means you can use
|
|
|
-``delay``, ``apply_async``, or *calling* it directly.
|
|
|
+Subtask instances also supports the calling API, which means that they
|
|
|
+have the ``delay`` and ``apply_async`` methods.
|
|
|
|
|
|
But there is a difference in that the subtask may already have
|
|
|
an argument signature specified. The ``add`` task takes two arguments,
|
|
@@ -366,190 +351,23 @@ To get to that we must introduce the canvas primitives...
|
|
|
The Primitives
|
|
|
--------------
|
|
|
|
|
|
-- ``group``
|
|
|
-
|
|
|
- The group primitive is a subtask that takes a list of tasks that should
|
|
|
- be applied in parallel.
|
|
|
-
|
|
|
-- ``chain``
|
|
|
-
|
|
|
- The chain primitive lets us link together subtasks so that one is called
|
|
|
- after the other, essentially forming a *chain* of callbacks.
|
|
|
-
|
|
|
-- ``chord``
|
|
|
-
|
|
|
- A chord is just like a group but with a callback. A group consists
|
|
|
- of a header group and a body, where the body is a task that should execute
|
|
|
- after all of the tasks in the header is complete.
|
|
|
-
|
|
|
-- ``map``
|
|
|
-
|
|
|
- The map primitive works like the built-in ``map`` function, but creates
|
|
|
- a temporary task where a list of arguments is applied to the task.
|
|
|
- E.g. ``task.map([1, 2])`` results in a single task
|
|
|
- being called, appyling the arguments in order to the task function so
|
|
|
- that the result is::
|
|
|
-
|
|
|
- res = [task(1), task(2)]
|
|
|
-
|
|
|
-- ``starmap``
|
|
|
-
|
|
|
- Works exactly like map except the arguments are applied as ``*args``.
|
|
|
- For example ``add.starmap([(2, 2), (4, 4)])`` results in a single
|
|
|
- task calling::
|
|
|
-
|
|
|
- res = [add(2, 2), add(4, 4)]
|
|
|
-
|
|
|
-- ``chunks``
|
|
|
+.. topic:: overview of primitives
|
|
|
|
|
|
- Chunking splits a long list of arguments into parts, e.g the operation::
|
|
|
-
|
|
|
- >>> add.chunks(zip(xrange(1000), xrange(1000), 10))
|
|
|
-
|
|
|
- will create 10 tasks that apply 100 items each.
|
|
|
+ .. hlist::
|
|
|
+ :columns: 2
|
|
|
|
|
|
+ - :ref:`group <canvas-group>`
|
|
|
+ - :ref:`chain <canvas-chain>`
|
|
|
+ - :ref:`chord <canvas-chord>`
|
|
|
+ - :ref:`map <canvas-map>`
|
|
|
+ - :ref:`starmap <canvas-map>`
|
|
|
+ - :ref:`chunks <canvas-chunks>`
|
|
|
|
|
|
The primitives are also subtasks themselves, so that they can be combined
|
|
|
in any number of ways to compose complex workflows.
|
|
|
|
|
|
Here's some examples::
|
|
|
|
|
|
-- Simple chain
|
|
|
-
|
|
|
- Here's a simple chain, the first task executes passing its return value
|
|
|
- to the next task in the chain, and so on.
|
|
|
-
|
|
|
- .. code-block:: python
|
|
|
-
|
|
|
- # 2 + 2 + 4 + 8
|
|
|
- >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
|
|
|
- >>> res.get()
|
|
|
- 16
|
|
|
-
|
|
|
- This can also be written using pipes::
|
|
|
-
|
|
|
- >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
|
|
|
- 16
|
|
|
-
|
|
|
-- Immutable subtasks
|
|
|
-
|
|
|
- As we have learned signatures can be partial, so that arguments can be
|
|
|
- added to the existing arguments, but you may not always want that,
|
|
|
- for example if you don't want the result of the previous task in a chain.
|
|
|
-
|
|
|
- In that case you can mark the subtask as immutable, so that the arguments
|
|
|
- cannot be changed::
|
|
|
-
|
|
|
- >>> add.subtask((2, 2), immutable=True)
|
|
|
-
|
|
|
- There's also an ``.si`` shortcut for this::
|
|
|
-
|
|
|
- >>> add.si(2, 2)
|
|
|
-
|
|
|
- Now we can create a chain of independent tasks instead::
|
|
|
-
|
|
|
- >>> res = (add.si(2, 2), add.si(4, 4), add.s(8, 8))()
|
|
|
- >>> res.get()
|
|
|
- 16
|
|
|
-
|
|
|
- >>> res.parent.get()
|
|
|
- 8
|
|
|
-
|
|
|
- >>> res.parent.parent.get()
|
|
|
- 4
|
|
|
-
|
|
|
-- Simple group
|
|
|
-
|
|
|
- We can easily create a group of tasks to execute in parallel::
|
|
|
-
|
|
|
- >>> res = group(add.s(i, i) for i in xrange(10))()
|
|
|
- >>> res.get(timeout=1)
|
|
|
- [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
|
|
|
-
|
|
|
- - For primitives `.apply_async` is special...
|
|
|
-
|
|
|
- as it will create a temporary task to apply the tasks in,
|
|
|
- for example by *applying the group*::
|
|
|
-
|
|
|
- >>> g = group(add.s(i, i) for i in xrange(10))
|
|
|
- >>> g() # << applying
|
|
|
-
|
|
|
- the act of sending the messages for the tasks in the group
|
|
|
- will happen in the current process,
|
|
|
- but with ``.apply_async`` this happens in a temporary task
|
|
|
- instead::
|
|
|
-
|
|
|
- >>> g = group(add.s(i, i) for i in xrange(10))
|
|
|
- >>> g.apply_async()
|
|
|
-
|
|
|
- This is useful because we can e.g. specify a time for the
|
|
|
- messages in the group to be called::
|
|
|
-
|
|
|
- >>> g.apply_async(countdown=10)
|
|
|
-
|
|
|
-- Simple chord
|
|
|
-
|
|
|
- The chord primitive enables us to add callback to be called when
|
|
|
- all of the tasks in a group has finished executing, which is often
|
|
|
- required for algorithms that aren't embarrassingly parallel::
|
|
|
-
|
|
|
- >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
|
|
|
- >>> res.get()
|
|
|
- 90
|
|
|
-
|
|
|
- The above example creates 10 task that all start in parallel,
|
|
|
- and when all of them is complete the return values is combined
|
|
|
- into a list and sent to the ``xsum`` task.
|
|
|
-
|
|
|
- The body of a chord can also be immutable, so that the return value
|
|
|
- of the group is not passed on to the callback::
|
|
|
-
|
|
|
- >>> chord((import_contact.s(c) for c in contacts),
|
|
|
- ... notify_complete.si(import_id)).apply_async()
|
|
|
-
|
|
|
- Note the use of ``.si`` above which creates an immutable subtask.
|
|
|
-
|
|
|
-- Blow your mind by combining
|
|
|
-
|
|
|
- Chains can be partial too::
|
|
|
-
|
|
|
- >>> c1 = (add.s(4) | mul.s(8))
|
|
|
-
|
|
|
- # (16 + 4) * 8
|
|
|
- >>> res = c1(16)
|
|
|
- >>> res.get()
|
|
|
- 160
|
|
|
-
|
|
|
- Which means that you can combine chains::
|
|
|
-
|
|
|
- # ((4 + 16) * 2 + 4) * 8
|
|
|
- >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
|
|
|
-
|
|
|
- >>> res = c2()
|
|
|
- >>> res.get()
|
|
|
- 352
|
|
|
-
|
|
|
- Chaining a group together with another task will automatically
|
|
|
- upgrade it to be a chord::
|
|
|
-
|
|
|
- >>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s()))
|
|
|
- >>> res = c3()
|
|
|
- >>> res.get()
|
|
|
- 90
|
|
|
-
|
|
|
- Groups and chords accepts partial arguments too, so in a chain
|
|
|
- the return value of the previous task is forwarded to all tasks in the group::
|
|
|
-
|
|
|
-
|
|
|
- >>> new_user_workflow = (create_user.s() | group(
|
|
|
- ... import_contacts.s(),
|
|
|
- ... send_welcome_email.s()))
|
|
|
- ... new_user_workflow.delay(username='artv',
|
|
|
- ... first='Art',
|
|
|
- ... last='Vandelay',
|
|
|
- ... email='art@vandelay.com')
|
|
|
-
|
|
|
-
|
|
|
Be sure to read more about workflows in the :ref:`Canvas <guide-canvas>` user
|
|
|
guide.
|
|
|
|