sets.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import warnings
  2. from kombu.utils import cached_property
  3. from celery import registry
  4. from celery.app import app_or_default
  5. from celery.datastructures import AttributeDict
  6. from celery.utils import gen_unique_id
  7. from celery.utils.compat import UserList
  8. TASKSET_DEPRECATION_TEXT = """\
  9. Using this invocation of TaskSet is deprecated and will be removed
  10. in Celery v2.4!
  11. TaskSets now supports multiple types of tasks, the API has to reflect
  12. this so the syntax has been changed to:
  13. from celery.task.sets import TaskSet
  14. ts = TaskSet(tasks=[
  15. %(cls)s.subtask(args1, kwargs1, options1),
  16. %(cls)s.subtask(args2, kwargs2, options2),
  17. ...
  18. %(cls)s.subtask(argsN, kwargsN, optionsN),
  19. ])
  20. result = ts.apply_async()
  21. Thank you for your patience!
  22. """
  23. class subtask(AttributeDict):
  24. """Class that wraps the arguments and execution options
  25. for a single task invocation.
  26. Used as the parts in a :class:`TaskSet` or to safely
  27. pass tasks around as callbacks.
  28. :param task: Either a task class/instance, or the name of a task.
  29. :keyword args: Positional arguments to apply.
  30. :keyword kwargs: Keyword arguments to apply.
  31. :keyword options: Additional options to
  32. :func:`celery.execute.apply_async`.
  33. Note that if the first argument is a :class:`dict`, the other
  34. arguments will be ignored and the values in the dict will be used
  35. instead.
  36. >>> s = subtask("tasks.add", args=(2, 2))
  37. >>> subtask(s)
  38. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  39. """
  40. def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
  41. init = super(subtask, self).__init__
  42. if isinstance(task, dict):
  43. return init(task) # works like dict(d)
  44. # Also supports using task class/instance instead of string name.
  45. try:
  46. task_name = task.name
  47. except AttributeError:
  48. task_name = task
  49. init(task=task_name, args=tuple(args or ()),
  50. kwargs=dict(kwargs or {}, **ex),
  51. options=options or {})
  52. def delay(self, *argmerge, **kwmerge):
  53. """Shortcut to `apply_async(argmerge, kwargs)`."""
  54. return self.apply_async(args=argmerge, kwargs=kwmerge)
  55. def apply(self, args=(), kwargs={}, **options):
  56. """Apply this task locally."""
  57. # For callbacks: extra args are prepended to the stored args.
  58. args = tuple(args) + tuple(self.args)
  59. kwargs = dict(self.kwargs, **kwargs)
  60. options = dict(self.options, **options)
  61. return self.type.apply(args, kwargs, **options)
  62. def apply_async(self, args=(), kwargs={}, **options):
  63. """Apply this task asynchronously."""
  64. # For callbacks: extra args are prepended to the stored args.
  65. args = tuple(args) + tuple(self.args)
  66. kwargs = dict(self.kwargs, **kwargs)
  67. options = dict(self.options, **options)
  68. return self.type.apply_async(args, kwargs, **options)
  69. def get_type(self):
  70. return self.type
  71. def __reduce__(self):
  72. # for serialization, the task type is lazily loaded,
  73. # and not stored in the dict itself.
  74. return (self.__class__, (dict(self), ), None)
  75. def __repr__(self, kwformat=lambda i: "%s=%r" % i, sep=', '):
  76. kw = self["kwargs"]
  77. return "%s(%s%s%s)" % (self["task"], sep.join(map(repr, self["args"])),
  78. kw and sep or "", sep.join(map(kwformat, kw.iteritems())))
  79. @cached_property
  80. def type(self):
  81. return registry.tasks[self.task]
  82. class TaskSet(UserList):
  83. """A task containing several subtasks, making it possible
  84. to track how many, or when all of the tasks have been completed.
  85. :param tasks: A list of :class:`subtask` instances.
  86. Example::
  87. >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
  88. >>> taskset = TaskSet(refresh_feed.subtask((url, )) for url in urls)
  89. >>> taskset_result = taskset.apply_async()
  90. >>> list_of_return_values = taskset_result.join() # *expensive*
  91. """
  92. _task = None # compat
  93. _task_name = None # compat
  94. #: Total number of subtasks in this set.
  95. total = None
  96. def __init__(self, task=None, tasks=None, app=None, Publisher=None):
  97. self.app = app_or_default(app)
  98. if task is not None:
  99. if hasattr(task, "__iter__"):
  100. tasks = task
  101. else:
  102. # Previously TaskSet only supported applying one kind of task.
  103. # the signature then was TaskSet(task, arglist),
  104. # so convert the arguments to subtasks'.
  105. tasks = [subtask(task, *arglist) for arglist in tasks]
  106. task = self._task = registry.tasks[task.name]
  107. self._task_name = task.name
  108. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  109. "cls": task.__class__.__name__},
  110. DeprecationWarning)
  111. self.data = list(tasks or [])
  112. self.total = len(self.tasks)
  113. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  114. def apply_async(self, connection=None, connect_timeout=None,
  115. publisher=None, taskset_id=None):
  116. """Apply taskset."""
  117. return self.app.with_default_connection(self._apply_async)(
  118. connection=connection,
  119. connect_timeout=connect_timeout,
  120. publisher=publisher,
  121. taskset_id=taskset_id)
  122. def _apply_async(self, connection=None, connect_timeout=None,
  123. publisher=None, taskset_id=None):
  124. if self.app.conf.CELERY_ALWAYS_EAGER:
  125. return self.apply(taskset_id=taskset_id)
  126. setid = taskset_id or gen_unique_id()
  127. pub = publisher or self.Publisher(connection=connection)
  128. try:
  129. results = self._async_results(setid, pub)
  130. finally:
  131. if not publisher: # created by us.
  132. pub.close()
  133. return self.app.TaskSetResult(setid, results)
  134. def _async_results(self, taskset_id, publisher):
  135. return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
  136. for task in self.tasks]
  137. def apply(self, taskset_id=None):
  138. """Applies the taskset locally by blocking until all tasks return."""
  139. setid = taskset_id or gen_unique_id()
  140. return self.app.TaskSetResult(setid, self._sync_results(setid))
  141. def _sync_results(self, taskset_id):
  142. return [task.apply(taskset_id=taskset_id) for task in self.tasks]
  143. @property
  144. def tasks(self):
  145. return self.data
  146. @property
  147. def task(self):
  148. warnings.warn(
  149. "TaskSet.task is deprecated and will be removed in 1.4",
  150. DeprecationWarning)
  151. return self._task
  152. @property
  153. def task_name(self):
  154. warnings.warn(
  155. "TaskSet.task_name is deprecated and will be removed in 1.4",
  156. DeprecationWarning)
  157. return self._task_name