tasksets.rst 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. .. _guide-sets:
  2. =======================================
  3. Sets of tasks, Subtasks and Callbacks
  4. =======================================
  5. .. contents::
  6. :local:
  7. .. _sets-subtasks:
  8. Subtasks
  9. ========
  10. .. versionadded:: 2.0
  11. The :class:`~celery.task.sets.subtask` type is used to wrap the arguments and
  12. execution options for a single task invocation::
  13. subtask(task_name_or_cls, args, kwargs, options)
  14. For convenience every task also has a shortcut to create subtasks::
  15. task.subtask(args, kwargs, options)
  16. :class:`~celery.task.sets.subtask` is actually a :class:`dict` subclass,
  17. which means it can be serialized with JSON or other encodings that doesn't
  18. support complex Python objects.
  19. Also it can be regarded as a type, as the following usage works::
  20. >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
  21. >>> subtask(dict(s)) # coerce dict into subtask
  22. This makes it excellent as a means to pass callbacks around to tasks.
  23. .. _sets-callbacks:
  24. Callbacks
  25. ---------
  26. Let's improve our `add` task so it can accept a callback that
  27. takes the result as an argument::
  28. from celery.task import task
  29. from celery.task.sets import subtask
  30. @task
  31. def add(x, y, callback=None):
  32. result = x + y
  33. if callback is not None:
  34. subtask(callback).delay(result)
  35. return result
  36. :class:`~celery.task.sets.subtask` also knows how it should be applied,
  37. asynchronously by :meth:`~celery.task.sets.subtask.delay`, and
  38. eagerly by :meth:`~celery.task.sets.subtask.apply`.
  39. The best thing is that any arguments you add to `subtask.delay`,
  40. will be prepended to the arguments specified by the subtask itself!
  41. If you have the subtask::
  42. >>> add.subtask(args=(10, ))
  43. `subtask.delay(result)` becomes::
  44. >>> add.apply_async(args=(result, 10))
  45. ...
  46. Now let's execute our new `add` task with a callback::
  47. >>> add.delay(2, 2, callback=add.subtask((8, )))
  48. As expected this will first launch one task calculating :math:`2 + 2`, then
  49. another task calculating :math:`4 + 8`.
  50. .. _sets-taskset:
  51. Task Sets
  52. =========
  53. The :class:`~celery.task.sets.TaskSet` enables easy invocation of several
  54. tasks at once, and is then able to join the results in the same order as the
  55. tasks were invoked.
  56. A task set takes a list of :class:`~celery.task.sets.subtask`'s::
  57. >>> from celery.task.sets import TaskSet
  58. >>> from tasks import add
  59. >>> job = TaskSet(tasks=[
  60. ... add.subtask((4, 4)),
  61. ... add.subtask((8, 8)),
  62. ... add.subtask((16, 16)),
  63. ... add.subtask((32, 32)),
  64. ... ])
  65. >>> result = job.apply_async()
  66. >>> result.ready() # have all subtasks completed?
  67. True
  68. >>> result.successful() # were all subtasks successful?
  69. True
  70. >>> result.join()
  71. [4, 8, 16, 32, 64]
  72. .. _sets-results:
  73. Results
  74. -------
  75. When a :class:`~celery.task.sets.TaskSet` is applied it returns a
  76. :class:`~celery.result.TaskSetResult` object.
  77. :class:`~celery.result.TaskSetResult` takes a list of
  78. :class:`~celery.result.AsyncResult` instances and operates on them as if it was a
  79. single task.
  80. It supports the following operations:
  81. * :meth:`~celery.result.TaskSetResult.successful`
  82. Returns :const:`True` if all of the subtasks finished
  83. successfully (e.g. did not raise an exception).
  84. * :meth:`~celery.result.TaskSetResult.failed`
  85. Returns :const:`True` if any of the subtasks failed.
  86. * :meth:`~celery.result.TaskSetResult.waiting`
  87. Returns :const:`True` if any of the subtasks
  88. is not ready yet.
  89. * :meth:`~celery.result.TaskSetResult.ready`
  90. Return :const:`True` if all of the subtasks
  91. are ready.
  92. * :meth:`~celery.result.TaskSetResult.completed_count`
  93. Returns the number of completed subtasks.
  94. * :meth:`~celery.result.TaskSetResult.revoke`
  95. Revokes all of the subtasks.
  96. * :meth:`~celery.result.TaskSetResult.iterate`
  97. Iterates over the return values of the subtasks
  98. as they finish, one by one.
  99. * :meth:`~celery.result.TaskSetResult.join`
  100. Gather the results for all of the subtasks
  101. and return a list with them ordered by the order of which they
  102. were called.
  103. .. _chords:
  104. Chords
  105. ======
  106. .. versionadded:: 2.3
  107. A chord is a task that only executes after all of the tasks in a taskset has
  108. finished executing.
  109. Let's calculate the sum of the expression
  110. :math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits.
  111. First we need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is
  112. already a standard function):
  113. .. code-block:: python
  114. from celery.task import task
  115. @task
  116. def add(x, y):
  117. return x + y
  118. @task
  119. def tsum(numbers):
  120. return sum(numbers)
  121. Now we can use a chord to calculate each addition step in parallel, and then
  122. get the sum of the resulting numbers::
  123. >>> from celery.task import chord
  124. >>> from tasks import add, tsum
  125. >>> chord(add.subtask((i, i))
  126. ... for i in xrange(100))(tsum.subtask()).get()
  127. 9900
  128. This is obviously a very contrived example, the overhead of messaging and
  129. synchronization makes this a lot slower than its Python counterpart::
  130. sum(i + i for i in xrange(100))
  131. The synchronization step is costly, so you should avoid using chords as much
  132. as possible. Still, the chord is a powerful primitive to have in your toolbox
  133. as synchronization is a required step for many parallel algorithms.
  134. Let's break the chord expression down::
  135. >>> callback = tsum.subtask()
  136. >>> header = [add.subtask((i, i)) for i in xrange(100])
  137. >>> result = chord(header)(callback)
  138. >>> result.get()
  139. 9900
  140. Remember, the callback can only be executed after all of the tasks in the
  141. header has returned. Each step in the header is executed as a task, in
  142. parallel, possibly on different nodes. The callback is then applied with
  143. the return value of each task in the header. The task id returned by
  144. :meth:`chord` is the id of the callback, so you can wait for it to complete
  145. and get the final return value (but remember to :ref:`never have a task wait
  146. for other tasks <task-synchronous-subtasks>`)
  147. .. _chord-important-notes:
  148. Important Notes
  149. ---------------
  150. By default the synchronization step is implemented by having a recurring task
  151. poll the completion of the taskset every second, applying the subtask when
  152. ready.
  153. Example implementation:
  154. .. code-block:: python
  155. def unlock_chord(taskset, callback, interval=1, max_retries=None):
  156. if taskset.ready():
  157. return subtask(callback).delay(taskset.join())
  158. unlock_chord.retry(countdown=interval, max_retries=max_retries)
  159. This is used by all result backends except Redis, which increments a
  160. counter after each task in the header, then applying the callback when the
  161. counter exceeds the number of tasks in the set.
  162. The Redis approach is a much better solution, but not easily implemented
  163. in other backends (suggestions welcome!)
  164. .. note::
  165. If you are using chords with the Redis result backend and also overriding
  166. the :meth:`Task.after_return` method, you need to make sure to call the
  167. super method or else the chord callback will not be applied.
  168. .. code-block:: python
  169. def after_return(self, *args, **kwargs):
  170. do_something()
  171. super(MyTask, self).after_return(*args, **kwargs)