sets.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.sets
  4. ~~~~~~~~~~~~~~~~
  5. Creating and applying groups of tasks.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import warnings
  12. from .. import registry
  13. from ..app import app_or_default
  14. from ..datastructures import AttributeDict
  15. from ..exceptions import CDeprecationWarning
  16. from ..utils import cached_property, reprcall, uuid
  17. from ..utils.compat import UserList
  18. TASKSET_DEPRECATION_TEXT = """\
  19. Using this invocation of TaskSet is deprecated and will be removed
  20. in Celery v2.4!
  21. TaskSets now supports multiple types of tasks, the API has to reflect
  22. this so the syntax has been changed to:
  23. from celery.task import TaskSet
  24. ts = TaskSet(tasks=[
  25. %(cls)s.subtask(args1, kwargs1, options1),
  26. %(cls)s.subtask(args2, kwargs2, options2),
  27. ...
  28. %(cls)s.subtask(argsN, kwargsN, optionsN),
  29. ])
  30. result = ts.apply_async()
  31. Thank you for your patience!
  32. """
  33. class subtask(AttributeDict):
  34. """Class that wraps the arguments and execution options
  35. for a single task invocation.
  36. Used as the parts in a :class:`TaskSet` or to safely
  37. pass tasks around as callbacks.
  38. :param task: Either a task class/instance, or the name of a task.
  39. :keyword args: Positional arguments to apply.
  40. :keyword kwargs: Keyword arguments to apply.
  41. :keyword options: Additional options to
  42. :func:`celery.execute.apply_async`.
  43. Note that if the first argument is a :class:`dict`, the other
  44. arguments will be ignored and the values in the dict will be used
  45. instead.
  46. >>> s = subtask("tasks.add", args=(2, 2))
  47. >>> subtask(s)
  48. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  49. """
  50. def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
  51. init = super(subtask, self).__init__
  52. if isinstance(task, dict):
  53. return init(task) # works like dict(d)
  54. # Also supports using task class/instance instead of string name.
  55. try:
  56. task_name = task.name
  57. except AttributeError:
  58. task_name = task
  59. init(task=task_name, args=tuple(args or ()),
  60. kwargs=dict(kwargs or {}, **ex),
  61. options=options or {})
  62. def delay(self, *argmerge, **kwmerge):
  63. """Shortcut to `apply_async(argmerge, kwargs)`."""
  64. return self.apply_async(args=argmerge, kwargs=kwmerge)
  65. def apply(self, args=(), kwargs={}, **options):
  66. """Apply this task locally."""
  67. # For callbacks: extra args are prepended to the stored args.
  68. args = tuple(args) + tuple(self.args)
  69. kwargs = dict(self.kwargs, **kwargs)
  70. options = dict(self.options, **options)
  71. return self.type.apply(args, kwargs, **options)
  72. def apply_async(self, args=(), kwargs={}, **options):
  73. """Apply this task asynchronously."""
  74. # For callbacks: extra args are prepended to the stored args.
  75. args = tuple(args) + tuple(self.args)
  76. kwargs = dict(self.kwargs, **kwargs)
  77. options = dict(self.options, **options)
  78. return self.type.apply_async(args, kwargs, **options)
  79. def __reduce__(self):
  80. # for serialization, the task type is lazily loaded,
  81. # and not stored in the dict itself.
  82. return (self.__class__, (dict(self), ), None)
  83. def __repr__(self):
  84. return reprcall(self["task"], self["args"], self["kwargs"])
  85. @cached_property
  86. def type(self):
  87. return registry.tasks[self.task]
  88. def maybe_subtask(t):
  89. if not isinstance(t, subtask):
  90. return subtask(t)
  91. return t
  92. class TaskSet(UserList):
  93. """A task containing several subtasks, making it possible
  94. to track how many, or when all of the tasks have been completed.
  95. :param tasks: A list of :class:`subtask` instances.
  96. Example::
  97. >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
  98. >>> taskset = TaskSet(refresh_feed.subtask((url, )) for url in urls)
  99. >>> taskset_result = taskset.apply_async()
  100. >>> list_of_return_values = taskset_result.join() # *expensive*
  101. """
  102. _task = None # compat
  103. _task_name = None # compat
  104. #: Total number of subtasks in this set.
  105. total = None
  106. def __init__(self, task=None, tasks=None, app=None, Publisher=None):
  107. self.app = app_or_default(app)
  108. if task is not None:
  109. if hasattr(task, "__iter__"):
  110. tasks = [maybe_subtask(t) for t in task]
  111. else:
  112. # Previously TaskSet only supported applying one kind of task.
  113. # the signature then was TaskSet(task, arglist),
  114. # so convert the arguments to subtasks'.
  115. tasks = [subtask(task, *arglist) for arglist in tasks]
  116. task = self._task = registry.tasks[task.name]
  117. self._task_name = task.name
  118. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  119. "cls": task.__class__.__name__},
  120. CDeprecationWarning)
  121. self.data = list(tasks or [])
  122. self.total = len(self.tasks)
  123. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  124. def apply_async(self, connection=None, connect_timeout=None,
  125. publisher=None, taskset_id=None):
  126. """Apply taskset."""
  127. app = self.app
  128. if app.conf.CELERY_ALWAYS_EAGER:
  129. return self.apply(taskset_id=taskset_id)
  130. with app.default_connection(connection, connect_timeout) as conn:
  131. setid = taskset_id or uuid()
  132. pub = publisher or self.Publisher(connection=conn)
  133. try:
  134. results = self._async_results(setid, pub)
  135. finally:
  136. if not publisher: # created by us.
  137. pub.close()
  138. return app.TaskSetResult(setid, results)
  139. def _async_results(self, taskset_id, publisher):
  140. return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
  141. for task in self.tasks]
  142. def apply(self, taskset_id=None):
  143. """Applies the taskset locally by blocking until all tasks return."""
  144. setid = taskset_id or uuid()
  145. return self.app.TaskSetResult(setid, self._sync_results(setid))
  146. def _sync_results(self, taskset_id):
  147. return [task.apply(taskset_id=taskset_id) for task in self.tasks]
  148. @property
  149. def tasks(self):
  150. return self.data
  151. @property
  152. def task(self):
  153. warnings.warn(
  154. "TaskSet.task is deprecated and will be removed in 1.4",
  155. CDeprecationWarning)
  156. return self._task
  157. @property
  158. def task_name(self):
  159. warnings.warn(
  160. "TaskSet.task_name is deprecated and will be removed in 1.4",
  161. CDeprecationWarning)
  162. return self._task_name