123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938 |
- .. _guide-canvas:
- =============================
- Canvas: Designing Workflows
- =============================
- .. contents::
- :local:
- :depth: 2
- .. _canvas-subtasks:
- .. _canvas-signatures:
- Signatures
- ==========
- .. versionadded:: 2.0
- You just learned how to call a task using the tasks ``delay`` method
- in the :ref:`calling <guide-calling>` guide, and this is often all you need,
- but sometimes you may want to pass the signature of a task invocation to
- another process or as an argument to another function.
- A :func:`~celery.signature` wraps the arguments, keyword arguments, and execution options
- of a single task invocation in a way such that it can be passed to functions
- or even serialized and sent across the wire.
- Signatures are often nicknamed "subtasks" because they describe a task to be called
- within a task.
- - You can create a signature for the ``add`` task using its name like this::
- >>> from celery import signature
- >>> signature('tasks.add', args=(2, 2), countdown=10)
- tasks.add(2, 2)
- This task has a signature of arity 2 (two arguments): ``(2, 2)``,
- and sets the countdown execution option to 10.
- - or you can create one using the task's ``subtask`` method::
- >>> add.subtask((2, 2), countdown=10)
- tasks.add(2, 2)
- - There is also a shortcut using star arguments::
- >>> add.s(2, 2)
- tasks.add(2, 2)
- - Keyword arguments are also supported::
- >>> add.s(2, 2, debug=True)
- tasks.add(2, 2, debug=True)
- - From any signature instance you can inspect the different fields::
- >>> s = add.subtask((2, 2), {'debug': True}, countdown=10)
- >>> s.args
- (2, 2)
- >>> s.kwargs
- {'debug': True}
- >>> s.options
- {'countdown': 10}
- - It supports the "Calling API" which means it supports ``delay`` and
- ``apply_async`` or being called directly.
- Calling the signature will execute the task inline in the current process::
- >>> add(2, 2)
- 4
- >>> add.s(2, 2)()
- 4
- ``delay`` is our beloved shortcut to ``apply_async`` taking star-arguments::
- >>> result = add.delay(2, 2)
- >>> result.get()
- 4
- ``apply_async`` takes the same arguments as the :meth:`Task.apply_async <@Task.apply_async>` method::
- >>> add.apply_async(args, kwargs, **options)
- >>> add.subtask(args, kwargs, **options).apply_async()
- >>> add.apply_async((2, 2), countdown=1)
- >>> add.subtask((2, 2), countdown=1).apply_async()
- - You can't define options with :meth:`~@Task.s`, but a chaining
- ``set`` call takes care of that::
- >>> add.s(2, 2).set(countdown=1)
- proj.tasks.add(2, 2)
- Partials
- --------
- With a signature, you can execute the task in a worker::
- >>> add.s(2, 2).delay()
- >>> add.s(2, 2).apply_async(countdown=1)
- Or you can call it directly in the current process::
- >>> add.s(2, 2)()
- 4
- Specifying additional args, kwargs or options to ``apply_async``/``delay``
- creates partials:
- - Any arguments added will be prepended to the args in the signature::
- >>> partial = add.s(2) # incomplete signature
- >>> partial.delay(4) # 2 + 4
- >>> partial.apply_async((4, )) # same
- - Any keyword arguments added will be merged with the kwargs in the signature,
- with the new keyword arguments taking precedence::
- >>> s = add.s(2, 2)
- >>> s.delay(debug=True) # -> add(2, 2, debug=True)
- >>> s.apply_async(kwargs={'debug': True}) # same
- - Any options added will be merged with the options in the signature,
- with the new options taking precedence::
- >>> s = add.subtask((2, 2), countdown=10)
- >>> s.apply_async(countdown=1) # countdown is now 1
- You can also clone signatures to create derivates:
- >>> s = add.s(2)
- proj.tasks.add(2)
- >>> s.clone(args=(4, ), kwargs={'debug': True})
- proj.tasks.add(2, 4, debug=True)
- Immutability
- ------------
- .. versionadded:: 3.0
- Partials are meant to be used with callbacks, any tasks linked or chord
- callbacks will be applied with the result of the parent task.
- Sometimes you want to specify a callback that does not take
- additional arguments, and in that case you can set the signature
- to be immutable::
- >>> add.apply_async((2, 2), link=reset_buffers.subtask(immutable=True))
- The ``.si()`` shortcut can also be used to create immutable signatures::
- >>> add.apply_async((2, 2), link=reset_buffers.si())
- Only the execution options can be set when a signature is immutable,
- so it's not possible to call the signature with partial args/kwargs.
- .. note::
- In this tutorial I sometimes use the prefix operator `~` to signatures.
- You probably shouldn't use it in your production code, but it's a handy shortcut
- when experimenting in the Python shell::
- >>> ~sig
- >>> # is the same as
- >>> sig.delay().get()
- .. _canvas-callbacks:
- Callbacks
- ---------
- .. versionadded:: 3.0
- Callbacks can be added to any task using the ``link`` argument
- to ``apply_async``::
- add.apply_async((2, 2), link=other_task.s())
- The callback will only be applied if the task exited successfully,
- and it will be applied with the return value of the parent task as argument.
- As I mentioned earlier, any arguments you add to a signature,
- will be prepended to the arguments specified by the signature itself!
- If you have the signature::
- >>> sig = add.s(10)
- then `sig.delay(result)` becomes::
- >>> add.apply_async(args=(result, 10))
- ...
- Now let's call our ``add`` task with a callback using partial
- arguments::
- >>> add.apply_async((2, 2), link=add.s(8))
- As expected this will first launch one task calculating :math:`2 + 2`, then
- another task calculating :math:`4 + 8`.
- The Primitives
- ==============
- .. versionadded:: 3.0
- .. topic:: Overview
- - ``group``
- The group primitive is a signature that takes a list of tasks that should
- be applied in parallel.
- - ``chain``
- The chain primitive lets us link together signatures so that one is called
- after the other, essentially forming a *chain* of callbacks.
- - ``chord``
- A chord is just like a group but with a callback. A chord consists
- of a header group and a body, where the body is a task that should execute
- after all of the tasks in the header are complete.
- - ``map``
- The map primitive works like the built-in ``map`` function, but creates
- a temporary task where a list of arguments is applied to the task.
- E.g. ``task.map([1, 2])`` results in a single task
- being called, applying the arguments in order to the task function so
- that the result is::
- res = [task(1), task(2)]
- - ``starmap``
- Works exactly like map except the arguments are applied as ``*args``.
- For example ``add.starmap([(2, 2), (4, 4)])`` results in a single
- task calling::
- res = [add(2, 2), add(4, 4)]
- - ``chunks``
- Chunking splits a long list of arguments into parts, e.g the operation::
- >>> items = zip(xrange(1000), xrange(1000)) # 1000 items
- >>> add.chunks(items, 10)
- will split the list of items into chunks of 10, resulting in 100
- tasks (each processing 10 items in sequence).
- The primitives are also signature objects themselves, so that they can be combined
- in any number of ways to compose complex workflows.
- Here's some examples:
- - Simple chain
- Here's a simple chain, the first task executes passing its return value
- to the next task in the chain, and so on.
- .. code-block:: python
- >>> from celery import chain
- # 2 + 2 + 4 + 8
- >>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
- >>> res.get()
- 16
- This can also be written using pipes::
- >>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
- 16
- - Immutable signatures
- Signatures can be partial so arguments can be
- added to the existing arguments, but you may not always want that,
- for example if you don't want the result of the previous task in a chain.
- In that case you can mark the signature as immutable, so that the arguments
- cannot be changed::
- >>> add.subtask((2, 2), immutable=True)
- There's also an ``.si`` shortcut for this::
- >>> add.si(2, 2)
- Now you can create a chain of independent tasks instead::
- >>> res = (add.si(2, 2) | add.si(4, 4) | add.s(8, 8))()
- >>> res.get()
- 16
- >>> res.parent.get()
- 8
- >>> res.parent.parent.get()
- 4
- - Simple group
- You can easily create a group of tasks to execute in parallel::
- >>> from celery import group
- >>> res = group(add.s(i, i) for i in xrange(10))()
- >>> res.get(timeout=1)
- [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- - Simple chord
- The chord primitive enables us to add callback to be called when
- all of the tasks in a group have finished executing, which is often
- required for algorithms that aren't embarrassingly parallel::
- >>> from celery import chord
- >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
- >>> res.get()
- 90
- The above example creates 10 task that all start in parallel,
- and when all of them are complete the return values are combined
- into a list and sent to the ``xsum`` task.
- The body of a chord can also be immutable, so that the return value
- of the group is not passed on to the callback::
- >>> chord((import_contact.s(c) for c in contacts),
- ... notify_complete.si(import_id)).apply_async()
- Note the use of ``.si`` above which creates an immutable signature.
- - Blow your mind by combining
- Chains can be partial too::
- >>> c1 = (add.s(4) | mul.s(8))
- # (16 + 4) * 8
- >>> res = c1(16)
- >>> res.get()
- 160
- Which means that you can combine chains::
- # ((4 + 16) * 2 + 4) * 8
- >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
- >>> res = c2()
- >>> res.get()
- 352
- Chaining a group together with another task will automatically
- upgrade it to be a chord::
- >>> c3 = (group(add.s(i, i) for i in xrange(10)) | xsum.s())
- >>> res = c3()
- >>> res.get()
- 90
- Groups and chords accepts partial arguments too, so in a chain
- the return value of the previous task is forwarded to all tasks in the group::
- >>> new_user_workflow = (create_user.s() | group(
- ... import_contacts.s(),
- ... send_welcome_email.s()))
- ... new_user_workflow.delay(username='artv',
- ... first='Art',
- ... last='Vandelay',
- ... email='art@vandelay.com')
- If you don't want to forward arguments to the group then
- you can make the signatures in the group immutable::
- >>> res = (add.s(4, 4) | group(add.si(i, i) for i in xrange(10)))()
- >>> res.get()
- <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
- bc01831b-9486-4e51-b046-480d7c9b78de,
- 2650a1b8-32bf-4771-a645-b0a35dcc791b,
- dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
- 59f92e0a-23ea-41ce-9fad-8645a0e7759c,
- 26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
- 2d10a5f4-37f0-41b2-96ac-a973b1df024d,
- e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
- 104b2be0-7b75-44eb-ac8e-f9220bdfa140,
- c5c551a5-0386-4973-aa37-b65cbeb2624b,
- 83f72d71-4b71-428e-b604-6f16599a9f37]>
- >>> res.parent.get()
- 8
- .. _canvas-chain:
- Chains
- ------
- .. versionadded:: 3.0
- Tasks can be linked together, which in practice means adding
- a callback task::
- >>> res = add.apply_async((2, 2), link=mul.s(16))
- >>> res.get()
- 4
- The linked task will be applied with the result of its parent
- task as the first argument, which in the above case will result
- in ``mul(4, 16)`` since the result is 4.
- The results will keep track of what subtasks a task applies,
- and this can be accessed from the result instance::
- >>> res.children
- [<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
- >>> res.children[0].get()
- 64
- The result instance also has a :meth:`~@AsyncResult.collect` method
- that treats the result as a graph, enabling you to iterate over
- the results::
- >>> list(res.collect())
- [(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
- (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
- By default :meth:`~@AsyncResult.collect` will raise an
- :exc:`~@IncompleteStream` exception if the graph is not fully
- formed (one of the tasks has not completed yet),
- but you can get an intermediate representation of the graph
- too::
- >>> for result, value in res.collect(intermediate=True)):
- ....
- You can link together as many tasks as you like,
- and signatures can be linked too::
- >>> s = add.s(2, 2)
- >>> s.link(mul.s(4))
- >>> s.link(log_result.s())
- You can also add *error callbacks* using the ``link_error`` argument::
- >>> add.apply_async((2, 2), link_error=log_error.s())
- >>> add.subtask((2, 2), link_error=log_error.s())
- Since exceptions can only be serialized when pickle is used
- the error callbacks take the id of the parent task as argument instead:
- .. code-block:: python
- from __future__ import print_function
- import os
- from proj.celery import app
- @app.task
- def log_error(task_id):
- result = app.AsyncResult(task_id)
- result.get(propagate=False) # make sure result written.
- with open(os.path.join('/var/errors', task_id), 'a') as fh:
- print('--\n\n{0} {1} {2}'.format(
- task_id, result.result, result.traceback), file=fh)
- To make it even easier to link tasks together there is
- a special signature called :class:`~celery.chain` that lets
- you chain tasks together:
- .. code-block:: python
- >>> from celery import chain
- >>> from proj.tasks import add, mul
- # (4 + 4) * 8 * 10
- >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
- proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
- Calling the chain will call the tasks in the current process
- and return the result of the last task in the chain::
- >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
- >>> res.get()
- 640
- It also sets ``parent`` attributes so that you can
- work your way up the chain to get intermediate results::
- >>> res.parent.get()
- 64
- >>> res.parent.parent.get()
- 8
- >>> res.parent.parent
- <AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
- Chains can also be made using the ``|`` (pipe) operator::
- >>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
- Graphs
- ~~~~~~
- In addition you can work with the result graph as a
- :class:`~celery.datastructures.DependencyGraph`:
- .. code-block:: python
- >>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
- >>> res.parent.parent.graph
- 285fa253-fcf8-42ef-8b95-0078897e83e6(1)
- 463afec2-5ed4-4036-b22d-ba067ec64f52(0)
- 872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
- 285fa253-fcf8-42ef-8b95-0078897e83e6(1)
- 463afec2-5ed4-4036-b22d-ba067ec64f52(0)
- You can even convert these graphs to *dot* format::
- >>> with open('graph.dot', 'w') as fh:
- ... res.parent.parent.graph.to_dot(fh)
- and create images:
- .. code-block:: bash
- $ dot -Tpng graph.dot -o graph.png
- .. image:: ../images/result_graph.png
- .. _canvas-group:
- Groups
- ------
- .. versionadded:: 3.0
- A group can be used to execute several tasks in parallel.
- The :class:`~celery.group` function takes a list of signatures::
- >>> from celery import group
- >>> from proj.tasks import add
- >>> group(add.s(2, 2), add.s(4, 4))
- (proj.tasks.add(2, 2), proj.tasks.add(4, 4))
- If you **call** the group, the tasks will be applied
- one after one in the current process, and a :class:`~celery.result.GroupResult`
- instance is returned which can be used to keep track of the results,
- or tell how many tasks are ready and so on::
- >>> g = group(add.s(2, 2), add.s(4, 4))
- >>> res = g()
- >>> res.get()
- [4, 8]
- Group also supports iterators::
- >>> group(add.s(i, i) for i in xrange(100))()
- A group is a signature object, so it can be used in combination
- with other signatures.
- Group Results
- ~~~~~~~~~~~~~
- The group task returns a special result too,
- this result works just like normal task results, except
- that it works on the group as a whole::
- >>> from celery import group
- >>> from tasks import add
- >>> job = group([
- ... add.s(2, 2),
- ... add.s(4, 4),
- ... add.s(8, 8),
- ... add.s(16, 16),
- ... add.s(32, 32),
- ... ])
- >>> result = job.apply_async()
- >>> result.ready() # have all subtasks completed?
- True
- >>> result.successful() # were all subtasks successful?
- True
- >>> result.get()
- [4, 8, 16, 32, 64]
- The :class:`~celery.result.GroupResult` takes a list of
- :class:`~celery.result.AsyncResult` instances and operates on them as
- if it was a single task.
- It supports the following operations:
- * :meth:`~celery.result.GroupResult.successful`
- Return :const:`True` if all of the subtasks finished
- successfully (e.g. did not raise an exception).
- * :meth:`~celery.result.GroupResult.failed`
- Return :const:`True` if any of the subtasks failed.
- * :meth:`~celery.result.GroupResult.waiting`
- Return :const:`True` if any of the subtasks
- is not ready yet.
- * :meth:`~celery.result.GroupResult.ready`
- Return :const:`True` if all of the subtasks
- are ready.
- * :meth:`~celery.result.GroupResult.completed_count`
- Return the number of completed subtasks.
- * :meth:`~celery.result.GroupResult.revoke`
- Revoke all of the subtasks.
- * :meth:`~celery.result.GroupResult.join`
- Gather the results for all of the subtasks
- and return a list with them ordered by the order of which they
- were called.
- .. _canvas-chord:
- Chords
- ------
- .. versionadded:: 2.3
- .. note::
- Tasks used within a chord must *not* ignore their results. If the result
- backend is disabled for *any* task (header or body) in your chord you
- should read ":ref:`chord-important-notes`".
-
- A chord is a task that only executes after all of the tasks in a group have
- finished executing.
- Let's calculate the sum of the expression
- :math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits.
- First you need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is
- already a standard function):
- .. code-block:: python
- @app.task
- def add(x, y):
- return x + y
- @app.task
- def tsum(numbers):
- return sum(numbers)
- Now you can use a chord to calculate each addition step in parallel, and then
- get the sum of the resulting numbers::
- >>> from celery import chord
- >>> from tasks import add, tsum
- >>> chord(add.s(i, i)
- ... for i in xrange(100))(tsum.s()).get()
- 9900
- This is obviously a very contrived example, the overhead of messaging and
- synchronization makes this a lot slower than its Python counterpart::
- sum(i + i for i in xrange(100))
- The synchronization step is costly, so you should avoid using chords as much
- as possible. Still, the chord is a powerful primitive to have in your toolbox
- as synchronization is a required step for many parallel algorithms.
- Let's break the chord expression down:
- .. code-block:: python
- >>> callback = tsum.s()
- >>> header = [add.s(i, i) for i in range(100)]
- >>> result = chord(header)(callback)
- >>> result.get()
- 9900
- Remember, the callback can only be executed after all of the tasks in the
- header have returned. Each step in the header is executed as a task, in
- parallel, possibly on different nodes. The callback is then applied with
- the return value of each task in the header. The task id returned by
- :meth:`chord` is the id of the callback, so you can wait for it to complete
- and get the final return value (but remember to :ref:`never have a task wait
- for other tasks <task-synchronous-subtasks>`)
- .. _chord-errors:
- Error handling
- ~~~~~~~~~~~~~~
- So what happens if one of the tasks raises an exception?
- This was not documented for some time and before version 3.1
- the exception value will be forwarded to the chord callback.
- From 3.1 errors will propagate to the callback, so the callback will not be executed
- instead the callback changes to failure state, and the error is set
- to the :exc:`~@ChordError` exception:
- .. code-block:: python
- >>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
- >>> result = c()
- >>> result.get()
- Traceback (most recent call last):
- File "<stdin>", line 1, in <module>
- File "*/celery/result.py", line 120, in get
- interval=interval)
- File "*/celery/backends/amqp.py", line 150, in wait_for
- raise self.exception_to_python(meta['result'])
- celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
- raised ValueError('something something',)
- If you're running 3.0.14 or later you can enable the new behavior via
- the :setting:`CELERY_CHORD_PROPAGATES` setting::
- CELERY_CHORD_PROPAGATES = True
- While the traceback may be different depending on which result backend is
- being used, you can see the error description includes the id of the task that failed
- and a string representation of the original exception. You can also
- find the original traceback in ``result.traceback``.
- Note that the rest of the tasks will still execute, so the third task
- (``add.s(8, 8)``) is still executed even though the middle task failed.
- Also the :exc:`~@ChordError` only shows the task that failed
- first (in time): it does not respect the ordering of the header group.
- .. _chord-important-notes:
- Important Notes
- ~~~~~~~~~~~~~~~
- Tasks used within a chord must *not* ignore their results. In practice this
- means that you must enable a :const:`CELERY_RESULT_BACKEND` in order to use
- chords. Additionally, if :const:`CELERY_IGNORE_RESULT` is set to :const:`True`
- in your configuration, be sure that the individual tasks to be used within
- the chord are defined with :const:`ignore_result=False`. This applies to both
- Task subclasses and decorated tasks.
- Example Task subclass:
- .. code-block:: python
- class MyTask(Task):
- abstract = True
- ignore_result = False
- Example decorated task:
- .. code-block:: python
- @app.task(ignore_result=False)
- def another_task(project):
- do_something()
- By default the synchronization step is implemented by having a recurring task
- poll the completion of the group every second, calling the signature when
- ready.
- Example implementation:
- .. code-block:: python
- from celery import maybe_signature
- @app.task(bind=True)
- def unlock_chord(self, group, callback, interval=1, max_retries=None):
- if group.ready():
- return maybe_signature(callback).delay(group.join())
- raise self.retry(countdown=interval, max_retries=max_retries)
- This is used by all result backends except Redis and Memcached, which
- increment a counter after each task in the header, then applying the callback
- when the counter exceeds the number of tasks in the set. *Note:* chords do not
- properly work with Redis before version 2.2; you will need to upgrade to at
- least 2.2 to use them.
- The Redis and Memcached approach is a much better solution, but not easily
- implemented in other backends (suggestions welcome!).
- .. note::
- If you are using chords with the Redis result backend and also overriding
- the :meth:`Task.after_return` method, you need to make sure to call the
- super method or else the chord callback will not be applied.
- .. code-block:: python
- def after_return(self, *args, **kwargs):
- do_something()
- super(MyTask, self).after_return(*args, **kwargs)
- .. _canvas-map:
- Map & Starmap
- -------------
- :class:`~celery.map` and :class:`~celery.starmap` are built-in tasks
- that calls the task for every element in a sequence.
- They differ from group in that
- - only one task message is sent
- - the operation is sequential.
- For example using ``map``:
- .. code-block:: python
- >>> from proj.tasks import add
- >>> ~xsum.map([range(10), range(100)])
- [45, 4950]
- is the same as having a task doing:
- .. code-block:: python
- @app.task
- def temp():
- return [xsum(range(10)), xsum(range(100))]
- and using ``starmap``::
- >>> ~add.starmap(zip(range(10), range(10)))
- [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- is the same as having a task doing:
- .. code-block:: python
- @app.task
- def temp():
- return [add(i, i) for i in range(10)]
- Both ``map`` and ``starmap`` are signature objects, so they can be used as
- other signatures and combined in groups etc., for example
- to call the starmap after 10 seconds::
- >>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
- .. _canvas-chunks:
- Chunks
- ------
- Chunking lets you divide an iterable of work into pieces, so that if
- you have one million objects, you can create 10 tasks with hundred
- thousand objects each.
- Some may worry that chunking your tasks results in a degradation
- of parallelism, but this is rarely true for a busy cluster
- and in practice since you are avoiding the overhead of messaging
- it may considerably increase performance.
- To create a chunks signature you can use :meth:`@Task.chunks`:
- .. code-block:: python
- >>> add.chunks(zip(range(100), range(100)), 10)
- As with :class:`~celery.group` the act of sending the messages for
- the chunks will happen in the current process when called:
- .. code-block:: python
- >>> from proj.tasks import add
- >>> res = add.chunks(zip(range(100), range(100)), 10)()
- >>> res.get()
- [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
- [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
- [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
- [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
- [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
- [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
- [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
- [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
- [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
- [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
- while calling ``.apply_async`` will create a dedicated
- task so that the individual tasks are applied in a worker
- instead::
- >>> add.chunks(zip(range(100), range(100), 10)).apply_async()
- You can also convert chunks to a group::
- >>> group = add.chunks(zip(range(100), range(100), 10)).group()
- and with the group skew the countdown of each task by increments
- of one::
- >>> group.skew(start=1, stop=10)()
- which means that the first task will have a countdown of 1, the second
- a countdown of 2 and so on.
|