sets.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import warnings
  4. from .. import registry
  5. from ..app import app_or_default
  6. from ..datastructures import AttributeDict
  7. from ..utils import cached_property, reprcall, uuid
  8. from ..utils.compat import UserList
  9. TASKSET_DEPRECATION_TEXT = """\
  10. Using this invocation of TaskSet is deprecated and will be removed
  11. in Celery v2.4!
  12. TaskSets now supports multiple types of tasks, the API has to reflect
  13. this so the syntax has been changed to:
  14. from celery.task import TaskSet
  15. ts = TaskSet(tasks=[
  16. %(cls)s.subtask(args1, kwargs1, options1),
  17. %(cls)s.subtask(args2, kwargs2, options2),
  18. ...
  19. %(cls)s.subtask(argsN, kwargsN, optionsN),
  20. ])
  21. result = ts.apply_async()
  22. Thank you for your patience!
  23. """
  24. class subtask(AttributeDict):
  25. """Class that wraps the arguments and execution options
  26. for a single task invocation.
  27. Used as the parts in a :class:`TaskSet` or to safely
  28. pass tasks around as callbacks.
  29. :param task: Either a task class/instance, or the name of a task.
  30. :keyword args: Positional arguments to apply.
  31. :keyword kwargs: Keyword arguments to apply.
  32. :keyword options: Additional options to
  33. :func:`celery.execute.apply_async`.
  34. Note that if the first argument is a :class:`dict`, the other
  35. arguments will be ignored and the values in the dict will be used
  36. instead.
  37. >>> s = subtask("tasks.add", args=(2, 2))
  38. >>> subtask(s)
  39. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  40. """
  41. def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
  42. init = super(subtask, self).__init__
  43. if isinstance(task, dict):
  44. return init(task) # works like dict(d)
  45. # Also supports using task class/instance instead of string name.
  46. try:
  47. task_name = task.name
  48. except AttributeError:
  49. task_name = task
  50. init(task=task_name, args=tuple(args or ()),
  51. kwargs=dict(kwargs or {}, **ex),
  52. options=options or {})
  53. def delay(self, *argmerge, **kwmerge):
  54. """Shortcut to `apply_async(argmerge, kwargs)`."""
  55. return self.apply_async(args=argmerge, kwargs=kwmerge)
  56. def apply(self, args=(), kwargs={}, **options):
  57. """Apply this task locally."""
  58. # For callbacks: extra args are prepended to the stored args.
  59. args = tuple(args) + tuple(self.args)
  60. kwargs = dict(self.kwargs, **kwargs)
  61. options = dict(self.options, **options)
  62. return self.type.apply(args, kwargs, **options)
  63. def apply_async(self, args=(), kwargs={}, **options):
  64. """Apply this task asynchronously."""
  65. # For callbacks: extra args are prepended to the stored args.
  66. args = tuple(args) + tuple(self.args)
  67. kwargs = dict(self.kwargs, **kwargs)
  68. options = dict(self.options, **options)
  69. return self.type.apply_async(args, kwargs, **options)
  70. def __reduce__(self):
  71. # for serialization, the task type is lazily loaded,
  72. # and not stored in the dict itself.
  73. return (self.__class__, (dict(self), ), None)
  74. def __repr__(self):
  75. return reprcall(self["task"], self["args"], self["kwargs"])
  76. @cached_property
  77. def type(self):
  78. return registry.tasks[self.task]
  79. class TaskSet(UserList):
  80. """A task containing several subtasks, making it possible
  81. to track how many, or when all of the tasks have been completed.
  82. :param tasks: A list of :class:`subtask` instances.
  83. Example::
  84. >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
  85. >>> taskset = TaskSet(refresh_feed.subtask((url, )) for url in urls)
  86. >>> taskset_result = taskset.apply_async()
  87. >>> list_of_return_values = taskset_result.join() # *expensive*
  88. """
  89. _task = None # compat
  90. _task_name = None # compat
  91. #: Total number of subtasks in this set.
  92. total = None
  93. def __init__(self, task=None, tasks=None, app=None, Publisher=None):
  94. self.app = app_or_default(app)
  95. if task is not None:
  96. if hasattr(task, "__iter__"):
  97. tasks = task
  98. else:
  99. # Previously TaskSet only supported applying one kind of task.
  100. # the signature then was TaskSet(task, arglist),
  101. # so convert the arguments to subtasks'.
  102. tasks = [subtask(task, *arglist) for arglist in tasks]
  103. task = self._task = registry.tasks[task.name]
  104. self._task_name = task.name
  105. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  106. "cls": task.__class__.__name__},
  107. DeprecationWarning)
  108. self.data = list(tasks or [])
  109. self.total = len(self.tasks)
  110. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  111. def apply_async(self, connection=None, connect_timeout=None,
  112. publisher=None, taskset_id=None):
  113. """Apply taskset."""
  114. app = self.app
  115. if app.conf.CELERY_ALWAYS_EAGER:
  116. return self.apply(taskset_id=taskset_id)
  117. with app.default_connection(connection, connect_timeout) as conn:
  118. setid = taskset_id or uuid()
  119. pub = publisher or self.Publisher(connection=conn)
  120. try:
  121. results = self._async_results(setid, pub)
  122. finally:
  123. if not publisher: # created by us.
  124. pub.close()
  125. return app.TaskSetResult(setid, results)
  126. def _async_results(self, taskset_id, publisher):
  127. return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
  128. for task in self.tasks]
  129. def apply(self, taskset_id=None):
  130. """Applies the taskset locally by blocking until all tasks return."""
  131. setid = taskset_id or uuid()
  132. return self.app.TaskSetResult(setid, self._sync_results(setid))
  133. def _sync_results(self, taskset_id):
  134. return [task.apply(taskset_id=taskset_id) for task in self.tasks]
  135. @property
  136. def tasks(self):
  137. return self.data
  138. @property
  139. def task(self):
  140. warnings.warn(
  141. "TaskSet.task is deprecated and will be removed in 1.4",
  142. DeprecationWarning)
  143. return self._task
  144. @property
  145. def task_name(self):
  146. warnings.warn(
  147. "TaskSet.task_name is deprecated and will be removed in 1.4",
  148. DeprecationWarning)
  149. return self._task_name