groups.rst 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. .. _guide-sets:
  2. .. _guide-groups:
  3. =======================================
  4. Groups, Chords, Chains and Callbacks
  5. =======================================
  6. .. contents::
  7. :local:
  8. .. _sets-subtasks:
  9. .. _groups-subtasks:
  10. Subtasks
  11. ========
  12. .. versionadded:: 2.0
  13. The :class:`~celery.task.sets.subtask` type is used to wrap the arguments and
  14. execution options for a single task invocation:
  15. .. code-block:: python
  16. from celery import subtask
  17. subtask(task_name_or_cls, args, kwargs, options)
  18. For convenience every task also has a shortcut to create subtasks:
  19. .. code-block:: python
  20. task.subtask(args, kwargs, options)
  21. :class:`~celery.task.sets.subtask` is actually a :class:`dict` subclass,
  22. which means it can be serialized with JSON or other encodings that doesn't
  23. support complex Python objects.
  24. Also it can be regarded as a type, as the following usage works::
  25. >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
  26. >>> subtask(dict(s)) # coerce dict into subtask
  27. This makes it excellent as a means to pass callbacks around to tasks.
  28. .. _sets-callbacks:
  29. .. _groups-callbacks:
  30. Callbacks
  31. ---------
  32. Callbacks can be added to any task using the ``link`` argument
  33. to ``apply_async``:
  34. add.apply_async((2, 2), link=other_task.subtask())
  35. The callback will only be applied if the task exited successfully,
  36. and it will be applied with the return value of the parent task as argument.
  37. The best thing is that any arguments you add to `subtask`,
  38. will be prepended to the arguments specified by the subtask itself!
  39. If you have the subtask::
  40. >>> add.subtask(args=(10, ))
  41. `subtask.delay(result)` becomes::
  42. >>> add.apply_async(args=(result, 10))
  43. ...
  44. Now let's execute our ``add`` task with a callback using partial
  45. arguments::
  46. >>> add.apply_async((2, 2), link=add.subtask((8, )))
  47. As expected this will first launch one task calculating :math:`2 + 2`, then
  48. another task calculating :math:`4 + 8`.
  49. .. _sets-taskset:
  50. .. _groups-group:
  51. Groups
  52. ======
  53. The :class:`~celery.task.sets.group` enables easy invocation of several
  54. tasks at once, and is then able to join the results in the same order as the
  55. tasks were invoked.
  56. ``group`` takes a list of :class:`~celery.task.sets.subtask`'s::
  57. >>> from celery import group
  58. >>> from tasks import add
  59. >>> job = group([
  60. ... add.subtask((4, 4)),
  61. ... add.subtask((8, 8)),
  62. ... add.subtask((16, 16)),
  63. ... add.subtask((32, 32)),
  64. ... ])
  65. >>> result = job.apply_async()
  66. >>> result.ready() # have all subtasks completed?
  67. True
  68. >>> result.successful() # were all subtasks successful?
  69. True
  70. >>> result.join()
  71. [4, 8, 16, 32, 64]
  72. The first argument can alternatively be an iterator, like::
  73. >>> group(add.subtask((i, i)) for i in range(100))
  74. .. _sets-results:
  75. Results
  76. -------
  77. When a :class:`~celery.task.sets.group` is applied it returns a
  78. :class:`~celery.result.TaskSetResult` object.
  79. :class:`~celery.result.TaskSetResult` takes a list of
  80. :class:`~celery.result.AsyncResult` instances and operates on them as if it was a
  81. single task.
  82. It supports the following operations:
  83. * :meth:`~celery.result.TaskSetResult.successful`
  84. Returns :const:`True` if all of the subtasks finished
  85. successfully (e.g. did not raise an exception).
  86. * :meth:`~celery.result.TaskSetResult.failed`
  87. Returns :const:`True` if any of the subtasks failed.
  88. * :meth:`~celery.result.TaskSetResult.waiting`
  89. Returns :const:`True` if any of the subtasks
  90. is not ready yet.
  91. * :meth:`~celery.result.TaskSetResult.ready`
  92. Return :const:`True` if all of the subtasks
  93. are ready.
  94. * :meth:`~celery.result.TaskSetResult.completed_count`
  95. Returns the number of completed subtasks.
  96. * :meth:`~celery.result.TaskSetResult.revoke`
  97. Revokes all of the subtasks.
  98. * :meth:`~celery.result.TaskSetResult.iterate`
  99. Iterates over the return values of the subtasks
  100. as they finish, one by one.
  101. * :meth:`~celery.result.TaskSetResult.join`
  102. Gather the results for all of the subtasks
  103. and return a list with them ordered by the order of which they
  104. were called.
  105. .. _chords:
  106. Chords
  107. ======
  108. .. versionadded:: 2.3
  109. A chord is a task that only executes after all of the tasks in a taskset has
  110. finished executing.
  111. Let's calculate the sum of the expression
  112. :math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits.
  113. First we need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is
  114. already a standard function):
  115. .. code-block:: python
  116. @celery.task
  117. def add(x, y):
  118. return x + y
  119. @celery.task
  120. def tsum(numbers):
  121. return sum(numbers)
  122. Now we can use a chord to calculate each addition step in parallel, and then
  123. get the sum of the resulting numbers::
  124. >>> from celery import chord
  125. >>> from tasks import add, tsum
  126. >>> chord(add.subtask((i, i))
  127. ... for i in xrange(100))(tsum.subtask()).get()
  128. 9900
  129. This is obviously a very contrived example, the overhead of messaging and
  130. synchronization makes this a lot slower than its Python counterpart::
  131. sum(i + i for i in xrange(100))
  132. The synchronization step is costly, so you should avoid using chords as much
  133. as possible. Still, the chord is a powerful primitive to have in your toolbox
  134. as synchronization is a required step for many parallel algorithms.
  135. Let's break the chord expression down::
  136. >>> callback = tsum.subtask()
  137. >>> header = [add.subtask((i, i)) for i in xrange(100)]
  138. >>> result = chord(header)(callback)
  139. >>> result.get()
  140. 9900
  141. Remember, the callback can only be executed after all of the tasks in the
  142. header has returned. Each step in the header is executed as a task, in
  143. parallel, possibly on different nodes. The callback is then applied with
  144. the return value of each task in the header. The task id returned by
  145. :meth:`chord` is the id of the callback, so you can wait for it to complete
  146. and get the final return value (but remember to :ref:`never have a task wait
  147. for other tasks <task-synchronous-subtasks>`)
  148. .. _chord-important-notes:
  149. Important Notes
  150. ---------------
  151. By default the synchronization step is implemented by having a recurring task
  152. poll the completion of the taskset every second, applying the subtask when
  153. ready.
  154. Example implementation:
  155. .. code-block:: python
  156. def unlock_chord(taskset, callback, interval=1, max_retries=None):
  157. if taskset.ready():
  158. return subtask(callback).delay(taskset.join())
  159. unlock_chord.retry(countdown=interval, max_retries=max_retries)
  160. This is used by all result backends except Redis and Memcached, which increment a
  161. counter after each task in the header, then applying the callback when the
  162. counter exceeds the number of tasks in the set. *Note:* chords do not properly
  163. work with Redis before version 2.2; you will need to upgrade to at least 2.2 to
  164. use them.
  165. The Redis and Memcached approach is a much better solution, but not easily
  166. implemented in other backends (suggestions welcome!).
  167. .. note::
  168. If you are using chords with the Redis result backend and also overriding
  169. the :meth:`Task.after_return` method, you need to make sure to call the
  170. super method or else the chord callback will not be applied.
  171. .. code-block:: python
  172. def after_return(self, *args, **kwargs):
  173. do_something()
  174. super(MyTask, self).after_return(*args, **kwargs)