sets.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import warnings
  2. from UserList import UserList
  3. from celery import registry
  4. from celery.app import app_or_default
  5. from celery.datastructures import AttributeDict
  6. from celery.utils import gen_unique_id
  7. TASKSET_DEPRECATION_TEXT = """\
  8. Using this invocation of TaskSet is deprecated and will be removed
  9. in Celery v2.4!
  10. TaskSets now supports multiple types of tasks, the API has to reflect
  11. this so the syntax has been changed to:
  12. from celery.task.sets import TaskSet
  13. ts = TaskSet(tasks=[
  14. %(cls)s.subtask(args1, kwargs1, options1),
  15. %(cls)s.subtask(args2, kwargs2, options2),
  16. %(cls)s.subtask(args3, kwargs3, options3),
  17. ...
  18. %(cls)s.subtask(argsN, kwargsN, optionsN),
  19. ])
  20. result = ts.apply_async()
  21. Thank you for your patience!
  22. """
  23. class subtask(AttributeDict):
  24. """Class that wraps the arguments and execution options
  25. for a single task invocation.
  26. Used as the parts in a :class:`TaskSet` or to safely
  27. pass tasks around as callbacks.
  28. :param task: Either a task class/instance, or the name of a task.
  29. :keyword args: Positional arguments to apply.
  30. :keyword kwargs: Keyword arguments to apply.
  31. :keyword options: Additional options to
  32. :func:`celery.execute.apply_async`.
  33. Note that if the first argument is a :class:`dict`, the other
  34. arguments will be ignored and the values in the dict will be used
  35. instead.
  36. >>> s = subtask("tasks.add", args=(2, 2))
  37. >>> subtask(s)
  38. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  39. """
  40. def __init__(self, task=None, args=None, kwargs=None, options=None,
  41. **extra):
  42. init = super(subtask, self).__init__
  43. if isinstance(task, dict):
  44. # Use the values from a dict.
  45. return init(task)
  46. # Also supports using task class/instance instead of string name.
  47. try:
  48. task_name = task.name
  49. except AttributeError:
  50. task_name = task
  51. init(task=task_name, args=tuple(args or ()),
  52. kwargs=dict(kwargs or {}, **extra),
  53. options=options or {})
  54. def delay(self, *argmerge, **kwmerge):
  55. """Shortcut to ``apply_async(argmerge, kwargs)``."""
  56. return self.apply_async(args=argmerge, kwargs=kwmerge)
  57. def apply(self, args=(), kwargs={}, **options):
  58. """Apply this task locally."""
  59. # For callbacks: extra args are prepended to the stored args.
  60. args = tuple(args) + tuple(self.args)
  61. kwargs = dict(self.kwargs, **kwargs)
  62. options = dict(self.options, **options)
  63. return self.get_type().apply(args, kwargs, **options)
  64. def apply_async(self, args=(), kwargs={}, **options):
  65. """Apply this task asynchronously."""
  66. # For callbacks: extra args are prepended to the stored args.
  67. args = tuple(args) + tuple(self.args)
  68. kwargs = dict(self.kwargs, **kwargs)
  69. options = dict(self.options, **options)
  70. return self.get_type().apply_async(args, kwargs, **options)
  71. def get_type(self):
  72. # For JSON serialization, the task class is lazily loaded,
  73. # and not stored in the dict itself.
  74. return registry.tasks[self.task]
  75. class TaskSet(UserList):
  76. """A task containing several subtasks, making it possible
  77. to track how many, or when all of the tasks has been completed.
  78. :param tasks: A list of :class:`subtask` instances.
  79. .. attribute:: total
  80. Total number of subtasks in this task set.
  81. Example::
  82. >>> from djangofeeds.tasks import RefreshFeedTask
  83. >>> from celery.task.sets import TaskSet, subtask
  84. >>> urls = ("http://cnn.com/rss",
  85. ... "http://bbc.co.uk/rss",
  86. ... "http://xkcd.com/rss")
  87. >>> subtasks = [RefreshFeedTask.subtask(kwargs={"feed_url": url})
  88. ... for url in urls]
  89. >>> taskset = TaskSet(tasks=subtasks)
  90. >>> taskset_result = taskset.apply_async()
  91. >>> list_of_return_values = taskset_result.join()
  92. """
  93. _task = None # compat
  94. _task_name = None # compat
  95. def __init__(self, task=None, tasks=None, app=None):
  96. if task is not None:
  97. if hasattr(task, "__iter__"):
  98. tasks = task
  99. else:
  100. # Previously TaskSet only supported applying one kind of task.
  101. # the signature then was TaskSet(task, arglist),
  102. # so convert the arguments to subtasks'.
  103. tasks = [subtask(task, *arglist) for arglist in tasks]
  104. task = self._task = registry.tasks[task.name]
  105. self._task_name = task.name
  106. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  107. "cls": task.__class__.__name__},
  108. DeprecationWarning)
  109. self.app = app_or_default(app)
  110. self.data = list(tasks)
  111. self.total = len(self.tasks)
  112. def apply_async(self, connection=None, connect_timeout=None):
  113. return self.app.with_default_connection(self._apply_async)(
  114. connection=connection, connect_timeout=connect_timeout)
  115. def _apply_async(self, connection=None, connect_timeout=None):
  116. """Run all tasks in the taskset.
  117. Returns a :class:`celery.result.TaskSetResult` instance.
  118. Example
  119. >>> ts = TaskSet(tasks=(
  120. ... RefreshFeedTask.subtask(["http://foo.com/rss"]),
  121. ... RefreshFeedTask.subtask(["http://bar.com/rss"]),
  122. ... ))
  123. >>> result = ts.apply_async()
  124. >>> result.taskset_id
  125. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  126. >>> result.subtask_ids
  127. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  128. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  129. >>> result.waiting()
  130. True
  131. >>> time.sleep(10)
  132. >>> result.ready()
  133. True
  134. >>> result.successful()
  135. True
  136. >>> result.failed()
  137. False
  138. >>> result.join()
  139. [True, True]
  140. """
  141. if self.app.conf.CELERY_ALWAYS_EAGER:
  142. return self.apply()
  143. taskset_id = gen_unique_id()
  144. publisher = self.app.amqp.TaskPublisher(connection=connection)
  145. try:
  146. results = [task.apply_async(taskset_id=taskset_id,
  147. publisher=publisher)
  148. for task in self.tasks]
  149. finally:
  150. publisher.close()
  151. return self.app.TaskSetResult(taskset_id, results)
  152. def apply(self):
  153. """Applies the taskset locally."""
  154. taskset_id = gen_unique_id()
  155. # this will be filled with EagerResults.
  156. results = [task.apply(taskset_id=taskset_id)
  157. for task in self.tasks]
  158. return self.app.TaskSetResult(taskset_id, results)
  159. @property
  160. def tasks(self):
  161. return self.data
  162. @property
  163. def task(self):
  164. warnings.warn(
  165. "TaskSet.task is deprecated and will be removed in 1.4",
  166. DeprecationWarning)
  167. return self._task
  168. @property
  169. def task_name(self):
  170. warnings.warn(
  171. "TaskSet.task_name is deprecated and will be removed in 1.4",
  172. DeprecationWarning)
  173. return self._task_name