tasksets.rst 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. =======================================
  2. Sets of tasks, Subtasks and Callbacks
  3. =======================================
  4. .. contents::
  5. :local:
  6. Subtasks
  7. ========
  8. The :class:`~celery.task.sets.subtask` class is used to wrap the arguments and
  9. execution options for a single task invocation::
  10. subtask(task_name_or_cls, args, kwargs, options)
  11. For convenience every task also has a shortcut to create subtask instances::
  12. task.subtask(args, kwargs, options)
  13. :class:`~celery.task.sets.subtask` is actually a subclass of :class:`dict`,
  14. which means it can be serialized with JSON or other encodings that doesn't
  15. support complex Python objects.
  16. Also it can be regarded as a type, as the following usage works::
  17. >>> s = subtask("tasks.add", args=(2, 2), kwargs={})
  18. >>> subtask(dict(s)) # coerce dict into subtask
  19. This makes it excellent as a means to pass callbacks around to tasks.
  20. Callbacks
  21. ---------
  22. Let's improve our ``add`` task so it can accept a callback that
  23. takes the result as an argument::
  24. from celery.decorators import task
  25. from celery.task.sets import subtask
  26. @task
  27. def add(x, y, callback=None):
  28. result = x + y
  29. if callback is not None:
  30. subtask(callback).apply_async(result)
  31. return result
  32. See? :class:`~celery.task.sets.subtask` also knows how it should be applied,
  33. asynchronously by :meth:`~celery.task.sets.subtask.apply_async`, and
  34. eagerly by :meth:`~celery.task.sets.subtask.apply`.
  35. The best thing is that any arguments you add to ``subtask.apply_async``,
  36. will be prepended to the arguments specified by the subtask itself!
  37. :note: additional keyword arguments will be added to the
  38. execution options, not the task keyword arguments.
  39. So if you have the subtask::
  40. >>> add.subtask(args=(10, ), options={"ignore_result": True})
  41. ``subtask.apply_async(result)`` becomes::
  42. >>> add.apply_async(args=(result, 10), ignore_result=True)
  43. and ``subtask.apply_async(result, ignore_result=False)`` becomes::
  44. >>> add.apply_async(args=(result, 10), ignore_result=False)
  45. Now let's execute our new ``add`` task with a callback::
  46. >>> add.delay(2, 2, callback=add.subtask((8, )))
  47. As expected this will first launch one task calculating ``2 + 2``, then
  48. another task calculating ``4 + 8``.
  49. Task Sets
  50. =========
  51. The :class:`~celery.task.sets.TaskSet` enables easy invocation of several
  52. tasks at once, and is then able to join the results in the same order as the
  53. tasks were invoked.
  54. The task set works on a list of :class:`~celery.task.sets.subtask`'s::
  55. >>> from celery.task.sets import TaskSet
  56. >>> from tasks import add
  57. >>> job = TaskSet(tasks=[
  58. ... add.subtask((4, 4)),
  59. ... add.subtask((8, 8)),
  60. ... add.subtask((16, 16)),
  61. ... add.subtask((32, 32)),
  62. ... ])
  63. >>> result = job.apply_async()
  64. >>> result.ready() # has all subtasks completed?
  65. True
  66. >>> result.successful() # was all subtasks successful?
  67. >>> result.join()
  68. [4, 8, 16, 32, 64]
  69. Results
  70. -------
  71. When a :class:`~celery.task.sets.TaskSet` is applied it returns a
  72. :class:`~celery.result.TaskSetResult` object.
  73. :class:`~celery.result.TaskSetResult` takes a list of
  74. :class:`~celery.result.AsyncResult` instances and operates on them as if it was a
  75. single task.
  76. It supports the following operations:
  77. * :meth:`~celery.result.TaskSetResult.successful`
  78. Returns :const:`True` if all of the subtasks finished
  79. successfully (e.g. did not raise an exception).
  80. * :meth:`~celery.result.TaskSetResult.failed`
  81. Returns :const:`True` if any of the subtasks failed.
  82. * :meth:`~celery.result.TaskSetResult.waiting`
  83. Returns :const:`True` if any of the subtasks
  84. is not ready.
  85. * :meth:`~celery.result.TaskSetResult.ready`
  86. Return :const:`True` if all of the subtasks
  87. are ready.
  88. * :meth:`~celery.result.TaskSetResult.completed_count`
  89. Returns the number of completed subtasks.
  90. * :meth:`~celery.result.TaskSetResult.revoke`
  91. Revoke all of the subtasks.
  92. * :meth:`~celery.result.TaskSetResult.iterate`
  93. Iterate over the return values of the subtasks
  94. as they finish, one by one.
  95. * :meth:`~celery.result.TaskSetResult.join`
  96. Gather the results for all of the subtasks,
  97. and return a list with them ordered by the order of which they
  98. were called.