sets.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import warnings
  2. from UserList import UserList
  3. from celery import conf
  4. from celery import registry
  5. from celery.datastructures import AttributeDict
  6. from celery.messaging import establish_connection, with_connection
  7. from celery.messaging import TaskPublisher
  8. from celery.result import TaskSetResult
  9. from celery.utils import gen_unique_id
  10. TASKSET_DEPRECATION_TEXT = """\
  11. Using this invocation of TaskSet is deprecated and will be removed
  12. in Celery v2.4!
  13. TaskSets now supports multiple types of tasks, the API has to reflect
  14. this so the syntax has been changed to:
  15. from celery.task.sets import TaskSet
  16. ts = TaskSet(tasks=[
  17. %(cls)s.subtask(args1, kwargs1, options1),
  18. %(cls)s.subtask(args2, kwargs2, options2),
  19. %(cls)s.subtask(args3, kwargs3, options3),
  20. ...
  21. %(cls)s.subtask(argsN, kwargsN, optionsN),
  22. ])
  23. result = ts.apply_async()
  24. Thank you for your patience!
  25. """
  26. class subtask(AttributeDict):
  27. """Class that wraps the arguments and execution options
  28. for a single task invocation.
  29. Used as the parts in a :class:`TaskSet` or to safely
  30. pass tasks around as callbacks.
  31. :param task: Either a task class/instance, or the name of a task.
  32. :keyword args: Positional arguments to apply.
  33. :keyword kwargs: Keyword arguments to apply.
  34. :keyword options: Additional options to
  35. :func:`celery.execute.apply_async`.
  36. Note that if the first argument is a :class:`dict`, the other
  37. arguments will be ignored and the values in the dict will be used
  38. instead.
  39. >>> s = subtask("tasks.add", args=(2, 2))
  40. >>> subtask(s)
  41. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  42. """
  43. def __init__(self, task=None, args=None, kwargs=None, options=None,
  44. **extra):
  45. init = super(subtask, self).__init__
  46. if isinstance(task, dict):
  47. # Use the values from a dict.
  48. return init(task)
  49. # Also supports using task class/instance instead of string name.
  50. try:
  51. task_name = task.name
  52. except AttributeError:
  53. task_name = task
  54. init(task=task_name, args=tuple(args or ()), kwargs=kwargs or (),
  55. options=options or ())
  56. def delay(self, *argmerge, **kwmerge):
  57. """Shortcut to ``apply_async(argmerge, kwargs)``."""
  58. return self.apply_async(args=argmerge, kwargs=kwmerge)
  59. def apply(self, args, kwargs, **options):
  60. """Apply this task locally."""
  61. # For callbacks: extra args are prepended to the stored args.
  62. args = tuple(args) + tuple(self.args)
  63. kwargs = dict(self.kwargs, **kwargs)
  64. options = dict(self.options, **options)
  65. return self.get_type().apply(args, kwargs, options)
  66. def apply_async(self, args, kwargs, **options):
  67. """Apply this task asynchronously."""
  68. # For callbacks: extra args are prepended to the stored args.
  69. args = tuple(args) + tuple(self.args)
  70. kwargs = dict(self.kwargs, **kwargs)
  71. options = dict(self.options, **options)
  72. return self.get_type().apply_async(args, kwargs, **options)
  73. def get_type(self):
  74. # For JSON serialization, the task class is lazily loaded,
  75. # and not stored in the dict itself.
  76. return registry.tasks[self.task]
  77. class TaskSet(UserList):
  78. """A task containing several subtasks, making it possible
  79. to track how many, or when all of the tasks has been completed.
  80. :param tasks: A list of :class:`subtask` instances.
  81. .. attribute:: total
  82. Total number of subtasks in this task set.
  83. Example::
  84. >>> from djangofeeds.tasks import RefreshFeedTask
  85. >>> from celery.task.sets import TaskSet, subtask
  86. >>> urls = ("http://cnn.com/rss",
  87. ... "http://bbc.co.uk/rss",
  88. ... "http://xkcd.com/rss")
  89. >>> subtasks = [RefreshFeedTask.subtask(kwargs={"feed_url": url})
  90. ... for url in urls]
  91. >>> taskset = TaskSet(tasks=subtasks)
  92. >>> taskset_result = taskset.apply_async()
  93. >>> list_of_return_values = taskset_result.join()
  94. """
  95. _task = None # compat
  96. _task_name = None # compat
  97. def __init__(self, task=None, tasks=None):
  98. if task is not None:
  99. if hasattr(task, "__iter__"):
  100. tasks = task
  101. else:
  102. # Previously TaskSet only supported applying one kind of task.
  103. # the signature then was TaskSet(task, arglist),
  104. # so convert the arguments to subtasks'.
  105. tasks = [subtask(task, *arglist) for arglist in tasks]
  106. task = self._task = registry.tasks[task.name]
  107. self._task_name = task.name
  108. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  109. "cls": task.__class__.__name__},
  110. DeprecationWarning)
  111. self.data = list(tasks)
  112. self.total = len(self.tasks)
  113. @with_connection
  114. def apply_async(self, connection=None,
  115. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  116. """Run all tasks in the taskset.
  117. Returns a :class:`celery.result.TaskSetResult` instance.
  118. Example
  119. >>> ts = TaskSet(tasks=(
  120. ... RefreshFeedTask.subtask(["http://foo.com/rss"]),
  121. ... RefreshFeedTask.subtask(["http://bar.com/rss"]),
  122. ... ))
  123. >>> result = ts.apply_async()
  124. >>> result.taskset_id
  125. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  126. >>> result.subtask_ids
  127. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  128. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  129. >>> result.waiting()
  130. True
  131. >>> time.sleep(10)
  132. >>> result.ready()
  133. True
  134. >>> result.successful()
  135. True
  136. >>> result.failed()
  137. False
  138. >>> result.join()
  139. [True, True]
  140. """
  141. if conf.ALWAYS_EAGER:
  142. return self.apply()
  143. taskset_id = gen_unique_id()
  144. publisher = TaskPublisher(connection=connection)
  145. try:
  146. results = [task.apply_async(taskset_id=taskset_id,
  147. publisher=publisher)
  148. for task in self.tasks]
  149. finally:
  150. publisher.close()
  151. return TaskSetResult(taskset_id, results)
  152. def apply(self):
  153. """Applies the taskset locally."""
  154. taskset_id = gen_unique_id()
  155. # This will be filled with EagerResults.
  156. return TaskSetResult(taskset_id, [task.apply(taskset_id=taskset_id)
  157. for task in self.tasks])
  158. @property
  159. def tasks(self):
  160. return self.data
  161. @property
  162. def task(self):
  163. warnings.warn(
  164. "TaskSet.task is deprecated and will be removed in 1.4",
  165. DeprecationWarning)
  166. return self._task
  167. @property
  168. def task_name(self):
  169. warnings.warn(
  170. "TaskSet.task_name is deprecated and will be removed in 1.4",
  171. DeprecationWarning)
  172. return self._task_name