sets.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import warnings
  4. from kombu.utils import cached_property
  5. from celery import registry
  6. from celery.app import app_or_default
  7. from celery.datastructures import AttributeDict
  8. from celery.utils import gen_unique_id, reprcall
  9. from celery.utils.compat import UserList
  10. TASKSET_DEPRECATION_TEXT = """\
  11. Using this invocation of TaskSet is deprecated and will be removed
  12. in Celery v2.4!
  13. TaskSets now supports multiple types of tasks, the API has to reflect
  14. this so the syntax has been changed to:
  15. from celery.task.sets import TaskSet
  16. ts = TaskSet(tasks=[
  17. %(cls)s.subtask(args1, kwargs1, options1),
  18. %(cls)s.subtask(args2, kwargs2, options2),
  19. ...
  20. %(cls)s.subtask(argsN, kwargsN, optionsN),
  21. ])
  22. result = ts.apply_async()
  23. Thank you for your patience!
  24. """
  25. class subtask(AttributeDict):
  26. """Class that wraps the arguments and execution options
  27. for a single task invocation.
  28. Used as the parts in a :class:`TaskSet` or to safely
  29. pass tasks around as callbacks.
  30. :param task: Either a task class/instance, or the name of a task.
  31. :keyword args: Positional arguments to apply.
  32. :keyword kwargs: Keyword arguments to apply.
  33. :keyword options: Additional options to
  34. :func:`celery.execute.apply_async`.
  35. Note that if the first argument is a :class:`dict`, the other
  36. arguments will be ignored and the values in the dict will be used
  37. instead.
  38. >>> s = subtask("tasks.add", args=(2, 2))
  39. >>> subtask(s)
  40. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  41. """
  42. def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
  43. init = super(subtask, self).__init__
  44. if isinstance(task, dict):
  45. return init(task) # works like dict(d)
  46. # Also supports using task class/instance instead of string name.
  47. try:
  48. task_name = task.name
  49. except AttributeError:
  50. task_name = task
  51. init(task=task_name, args=tuple(args or ()),
  52. kwargs=dict(kwargs or {}, **ex),
  53. options=options or {})
  54. def delay(self, *argmerge, **kwmerge):
  55. """Shortcut to `apply_async(argmerge, kwargs)`."""
  56. return self.apply_async(args=argmerge, kwargs=kwmerge)
  57. def apply(self, args=(), kwargs={}, **options):
  58. """Apply this task locally."""
  59. # For callbacks: extra args are prepended to the stored args.
  60. args = tuple(args) + tuple(self.args)
  61. kwargs = dict(self.kwargs, **kwargs)
  62. options = dict(self.options, **options)
  63. return self.type.apply(args, kwargs, **options)
  64. def apply_async(self, args=(), kwargs={}, **options):
  65. """Apply this task asynchronously."""
  66. # For callbacks: extra args are prepended to the stored args.
  67. args = tuple(args) + tuple(self.args)
  68. kwargs = dict(self.kwargs, **kwargs)
  69. options = dict(self.options, **options)
  70. return self.type.apply_async(args, kwargs, **options)
  71. def __reduce__(self):
  72. # for serialization, the task type is lazily loaded,
  73. # and not stored in the dict itself.
  74. return (self.__class__, (dict(self), ), None)
  75. def __repr__(self):
  76. return reprcall(self["task"], self["args"], self["kwargs"])
  77. @cached_property
  78. def type(self):
  79. return registry.tasks[self.task]
  80. class TaskSet(UserList):
  81. """A task containing several subtasks, making it possible
  82. to track how many, or when all of the tasks have been completed.
  83. :param tasks: A list of :class:`subtask` instances.
  84. Example::
  85. >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
  86. >>> taskset = TaskSet(refresh_feed.subtask((url, )) for url in urls)
  87. >>> taskset_result = taskset.apply_async()
  88. >>> list_of_return_values = taskset_result.join() # *expensive*
  89. """
  90. _task = None # compat
  91. _task_name = None # compat
  92. #: Total number of subtasks in this set.
  93. total = None
  94. def __init__(self, task=None, tasks=None, app=None, Publisher=None):
  95. self.app = app_or_default(app)
  96. if task is not None:
  97. if hasattr(task, "__iter__"):
  98. tasks = task
  99. else:
  100. # Previously TaskSet only supported applying one kind of task.
  101. # the signature then was TaskSet(task, arglist),
  102. # so convert the arguments to subtasks'.
  103. tasks = [subtask(task, *arglist) for arglist in tasks]
  104. task = self._task = registry.tasks[task.name]
  105. self._task_name = task.name
  106. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  107. "cls": task.__class__.__name__},
  108. DeprecationWarning)
  109. self.data = list(tasks or [])
  110. self.total = len(self.tasks)
  111. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  112. def apply_async(self, connection=None, connect_timeout=None,
  113. publisher=None, taskset_id=None):
  114. """Apply taskset."""
  115. app = self.app
  116. if app.conf.CELERY_ALWAYS_EAGER:
  117. return self.apply(taskset_id=taskset_id)
  118. with app.default_connection(connection, connect_timeout) as conn:
  119. setid = taskset_id or gen_unique_id()
  120. pub = publisher or self.Publisher(connection=conn)
  121. try:
  122. results = self._async_results(setid, pub)
  123. finally:
  124. if not publisher: # created by us.
  125. pub.close()
  126. return app.TaskSetResult(setid, results)
  127. def _async_results(self, taskset_id, publisher):
  128. return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
  129. for task in self.tasks]
  130. def apply(self, taskset_id=None):
  131. """Applies the taskset locally by blocking until all tasks return."""
  132. setid = taskset_id or gen_unique_id()
  133. return self.app.TaskSetResult(setid, self._sync_results(setid))
  134. def _sync_results(self, taskset_id):
  135. return [task.apply(taskset_id=taskset_id) for task in self.tasks]
  136. @property
  137. def tasks(self):
  138. return self.data
  139. @property
  140. def task(self):
  141. warnings.warn(
  142. "TaskSet.task is deprecated and will be removed in 1.4",
  143. DeprecationWarning)
  144. return self._task
  145. @property
  146. def task_name(self):
  147. warnings.warn(
  148. "TaskSet.task_name is deprecated and will be removed in 1.4",
  149. DeprecationWarning)
  150. return self._task_name