tasksets.rst 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. .. _guide-sets:
  2. =======================================
  3. Sets of tasks, Subtasks and Callbacks
  4. =======================================
  5. .. contents::
  6. :local:
  7. .. _sets-subtasks:
  8. Subtasks
  9. ========
  10. .. versionadded:: 2.0
  11. The :class:`~celery.task.sets.subtask` type is used to wrap the arguments and
  12. execution options for a single task invocation::
  13. subtask(task_name_or_cls, args, kwargs, options)
  14. For convenience every task also has a shortcut to create subtasks::
  15. task.subtask(args, kwargs, options)
  16. :class:`~celery.task.sets.subtask` is actually a :class:`dict` subclass,
  17. which means it can be serialized with JSON or other encodings that doesn't
  18. support complex Python objects.
  19. Also it can be regarded as a type, as the following usage works::
  20. >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
  21. >>> subtask(dict(s)) # coerce dict into subtask
  22. This makes it excellent as a means to pass callbacks around to tasks.
  23. .. _sets-callbacks:
  24. Callbacks
  25. ---------
  26. Let's improve our `add` task so it can accept a callback that
  27. takes the result as an argument::
  28. from celery.task import task
  29. from celery.task.sets import subtask
  30. @task
  31. def add(x, y, callback=None):
  32. result = x + y
  33. if callback is not None:
  34. subtask(callback).delay(result)
  35. return result
  36. :class:`~celery.task.sets.subtask` also knows how it should be applied,
  37. asynchronously by :meth:`~celery.task.sets.subtask.delay`, and
  38. eagerly by :meth:`~celery.task.sets.subtask.apply`.
  39. The best thing is that any arguments you add to `subtask.delay`,
  40. will be prepended to the arguments specified by the subtask itself!
  41. If you have the subtask::
  42. >>> add.subtask(args=(10, ))
  43. `subtask.delay(result)` becomes::
  44. >>> add.apply_async(args=(result, 10))
  45. ...
  46. Now let's execute our new `add` task with a callback::
  47. >>> add.delay(2, 2, callback=add.subtask((8, )))
  48. As expected this will first launch one task calculating :math:`2 + 2`, then
  49. another task calculating :math:`4 + 8`.
  50. .. _sets-taskset:
  51. Task Sets
  52. =========
  53. The :class:`~celery.task.sets.TaskSet` enables easy invocation of several
  54. tasks at once, and is then able to join the results in the same order as the
  55. tasks were invoked.
  56. A task set takes a list of :class:`~celery.task.sets.subtask`'s::
  57. >>> from celery.task.sets import TaskSet
  58. >>> from tasks import add
  59. >>> job = TaskSet(tasks=[
  60. ... add.subtask((4, 4)),
  61. ... add.subtask((8, 8)),
  62. ... add.subtask((16, 16)),
  63. ... add.subtask((32, 32)),
  64. ... ])
  65. >>> result = job.apply_async()
  66. >>> result.ready() # have all subtasks completed?
  67. True
  68. >>> result.successful() # were all subtasks successful?
  69. True
  70. >>> result.join()
  71. [4, 8, 16, 32, 64]
  72. .. _sets-results:
  73. Results
  74. -------
  75. When a :class:`~celery.task.sets.TaskSet` is applied it returns a
  76. :class:`~celery.result.TaskSetResult` object.
  77. :class:`~celery.result.TaskSetResult` takes a list of
  78. :class:`~celery.result.AsyncResult` instances and operates on them as if it was a
  79. single task.
  80. It supports the following operations:
  81. * :meth:`~celery.result.TaskSetResult.successful`
  82. Returns :const:`True` if all of the subtasks finished
  83. successfully (e.g. did not raise an exception).
  84. * :meth:`~celery.result.TaskSetResult.failed`
  85. Returns :const:`True` if any of the subtasks failed.
  86. * :meth:`~celery.result.TaskSetResult.waiting`
  87. Returns :const:`True` if any of the subtasks
  88. is not ready yet.
  89. * :meth:`~celery.result.TaskSetResult.ready`
  90. Return :const:`True` if all of the subtasks
  91. are ready.
  92. * :meth:`~celery.result.TaskSetResult.completed_count`
  93. Returns the number of completed subtasks.
  94. * :meth:`~celery.result.TaskSetResult.revoke`
  95. Revokes all of the subtasks.
  96. * :meth:`~celery.result.TaskSetResult.iterate`
  97. Iterates over the return values of the subtasks
  98. as they finish, one by one.
  99. * :meth:`~celery.result.TaskSetResult.join`
  100. Gather the results for all of the subtasks
  101. and return a list with them ordered by the order of which they
  102. were called.
  103. .. _chords:
  104. Chords
  105. ======
  106. A chord is a task that only executes after all of the tasks in a taskset has
  107. finished executing.
  108. Let's calculate the sum of the expression
  109. :math:`1 + 1 + 2 + 2 + 3 + 3 ... n + n` up to a hundred digits.
  110. First we need two tasks, :func:`add` and :func:`tsum` (:func:`sum` is
  111. already a standard function):
  112. .. code-block:: python
  113. from celery.task import task
  114. @task
  115. def add(x, y):
  116. return x + y
  117. @task
  118. def tsum(numbers):
  119. return sum(numbers)
  120. Now we can use a chord to calculate each addition step in parallel, and then
  121. get the sum of the resulting numbers::
  122. >>> from celery.task import chord
  123. >>> from tasks import add, tsum
  124. >>> chord(add.subtask((i, i))
  125. ... for i in xrange(100))(tsum.subtask()).get()
  126. 9900
  127. This is obviously a very contrived example, the overhead of messaging and
  128. synchronization makes this a lot slower than its Python counterpart::
  129. sum(i + i for i in xrange(100))
  130. The synchronization step is costly, so you should avoid using chords as much
  131. as possible. Still, the chord is a powerful primitive to have in your toolbox
  132. as synchronization is a required step for many parallel algorithms.
  133. Let's break the chord expression down::
  134. >>> callback = tsum.subtask()
  135. >>> header = [add.subtask((i, i)) for i in xrange(100])
  136. >>> result = chord(header)(callback)
  137. >>> result.get()
  138. 9900
  139. Remember, the callback can only be executed after all of the tasks in the
  140. header has returned. Each step in the header is executed as a task, in
  141. parallel, possibly on different nodes. The callback is then applied with
  142. the return value of each task in the header. The task id returned by
  143. :meth:`chord` is the id of the callback, so you can wait for it to complete
  144. and get the final return value (but remember to :ref:`never have a task wait
  145. for other tasks <task-synchronous-subtasks>`)
  146. .. _chord-important-notes:
  147. Important Notes
  148. ---------------
  149. By default the synchronization step is implemented by having a recurring task
  150. poll the completion of the taskset every second, applying the subtask when
  151. ready.
  152. Example implementation:
  153. .. code-block:: python
  154. def unlock_chord(taskset, callback, interval=1, max_retries=None):
  155. if taskset.ready():
  156. return subtask(callback).delay(taskset.join())
  157. unlock_chord.retry(countdown=interval, max_retries=max_retries)
  158. This is used by all result backends except Redis, which increments a
  159. counter after each task in the header, then applying the callback when the
  160. counter exceeds the number of tasks in the set.
  161. The Redis approach is a much better solution, but not easily implemented
  162. in other backends (suggestions welcome!)
  163. .. note::
  164. If you are using chords with the Redis result backend and also overriding
  165. the :meth:`Task.after_return` method, you need to make sure to call the
  166. super method or else the chord callback will not be applied.
  167. .. code-block:: python
  168. def after_return(self, *args, **kwargs):
  169. do_something()
  170. super(MyTask, self).after_return(*args, **kwargs)