sets.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. from __future__ import absolute_import, with_statement
  2. import warnings
  3. from kombu.utils import cached_property
  4. from celery import registry
  5. from celery.app import app_or_default
  6. from celery.datastructures import AttributeDict
  7. from celery.utils import gen_unique_id
  8. from celery.utils.compat import UserList
  9. TASKSET_DEPRECATION_TEXT = """\
  10. Using this invocation of TaskSet is deprecated and will be removed
  11. in Celery v2.4!
  12. TaskSets now supports multiple types of tasks, the API has to reflect
  13. this so the syntax has been changed to:
  14. from celery.task.sets import TaskSet
  15. ts = TaskSet(tasks=[
  16. %(cls)s.subtask(args1, kwargs1, options1),
  17. %(cls)s.subtask(args2, kwargs2, options2),
  18. ...
  19. %(cls)s.subtask(argsN, kwargsN, optionsN),
  20. ])
  21. result = ts.apply_async()
  22. Thank you for your patience!
  23. """
  24. class subtask(AttributeDict):
  25. """Class that wraps the arguments and execution options
  26. for a single task invocation.
  27. Used as the parts in a :class:`TaskSet` or to safely
  28. pass tasks around as callbacks.
  29. :param task: Either a task class/instance, or the name of a task.
  30. :keyword args: Positional arguments to apply.
  31. :keyword kwargs: Keyword arguments to apply.
  32. :keyword options: Additional options to
  33. :func:`celery.execute.apply_async`.
  34. Note that if the first argument is a :class:`dict`, the other
  35. arguments will be ignored and the values in the dict will be used
  36. instead.
  37. >>> s = subtask("tasks.add", args=(2, 2))
  38. >>> subtask(s)
  39. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  40. """
  41. def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):
  42. init = super(subtask, self).__init__
  43. if isinstance(task, dict):
  44. return init(task) # works like dict(d)
  45. # Also supports using task class/instance instead of string name.
  46. try:
  47. task_name = task.name
  48. except AttributeError:
  49. task_name = task
  50. init(task=task_name, args=tuple(args or ()),
  51. kwargs=dict(kwargs or {}, **ex),
  52. options=options or {})
  53. def delay(self, *argmerge, **kwmerge):
  54. """Shortcut to `apply_async(argmerge, kwargs)`."""
  55. return self.apply_async(args=argmerge, kwargs=kwmerge)
  56. def apply(self, args=(), kwargs={}, **options):
  57. """Apply this task locally."""
  58. # For callbacks: extra args are prepended to the stored args.
  59. args = tuple(args) + tuple(self.args)
  60. kwargs = dict(self.kwargs, **kwargs)
  61. options = dict(self.options, **options)
  62. return self.type.apply(args, kwargs, **options)
  63. def apply_async(self, args=(), kwargs={}, **options):
  64. """Apply this task asynchronously."""
  65. # For callbacks: extra args are prepended to the stored args.
  66. args = tuple(args) + tuple(self.args)
  67. kwargs = dict(self.kwargs, **kwargs)
  68. options = dict(self.options, **options)
  69. return self.type.apply_async(args, kwargs, **options)
  70. def get_type(self):
  71. return self.type
  72. def __reduce__(self):
  73. # for serialization, the task type is lazily loaded,
  74. # and not stored in the dict itself.
  75. return (self.__class__, (dict(self), ), None)
  76. def __repr__(self, kwformat=lambda i: "%s=%r" % i, sep=', '):
  77. kw = self["kwargs"]
  78. return "%s(%s%s%s)" % (self["task"], sep.join(map(repr, self["args"])),
  79. kw and sep or "", sep.join(map(kwformat, kw.iteritems())))
  80. @cached_property
  81. def type(self):
  82. return registry.tasks[self.task]
  83. class TaskSet(UserList):
  84. """A task containing several subtasks, making it possible
  85. to track how many, or when all of the tasks have been completed.
  86. :param tasks: A list of :class:`subtask` instances.
  87. Example::
  88. >>> urls = ("http://cnn.com/rss", "http://bbc.co.uk/rss")
  89. >>> taskset = TaskSet(refresh_feed.subtask((url, )) for url in urls)
  90. >>> taskset_result = taskset.apply_async()
  91. >>> list_of_return_values = taskset_result.join() # *expensive*
  92. """
  93. _task = None # compat
  94. _task_name = None # compat
  95. #: Total number of subtasks in this set.
  96. total = None
  97. def __init__(self, task=None, tasks=None, app=None, Publisher=None):
  98. self.app = app_or_default(app)
  99. if task is not None:
  100. if hasattr(task, "__iter__"):
  101. tasks = task
  102. else:
  103. # Previously TaskSet only supported applying one kind of task.
  104. # the signature then was TaskSet(task, arglist),
  105. # so convert the arguments to subtasks'.
  106. tasks = [subtask(task, *arglist) for arglist in tasks]
  107. task = self._task = registry.tasks[task.name]
  108. self._task_name = task.name
  109. warnings.warn(TASKSET_DEPRECATION_TEXT % {
  110. "cls": task.__class__.__name__},
  111. DeprecationWarning)
  112. self.data = list(tasks or [])
  113. self.total = len(self.tasks)
  114. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  115. def apply_async(self, connection=None, connect_timeout=None,
  116. publisher=None, taskset_id=None):
  117. """Apply taskset."""
  118. app = self.app
  119. if app.conf.CELERY_ALWAYS_EAGER:
  120. return self.apply(taskset_id=taskset_id)
  121. with app.default_connection(connection, connect_timeout) as conn:
  122. setid = taskset_id or gen_unique_id()
  123. pub = publisher or self.Publisher(connection=conn)
  124. try:
  125. results = self._async_results(setid, pub)
  126. finally:
  127. if not publisher: # created by us.
  128. pub.close()
  129. return app.TaskSetResult(setid, results)
  130. def _async_results(self, taskset_id, publisher):
  131. return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
  132. for task in self.tasks]
  133. def apply(self, taskset_id=None):
  134. """Applies the taskset locally by blocking until all tasks return."""
  135. setid = taskset_id or gen_unique_id()
  136. return self.app.TaskSetResult(setid, self._sync_results(setid))
  137. def _sync_results(self, taskset_id):
  138. return [task.apply(taskset_id=taskset_id) for task in self.tasks]
  139. @property
  140. def tasks(self):
  141. return self.data
  142. @property
  143. def task(self):
  144. warnings.warn(
  145. "TaskSet.task is deprecated and will be removed in 1.4",
  146. DeprecationWarning)
  147. return self._task
  148. @property
  149. def task_name(self):
  150. warnings.warn(
  151. "TaskSet.task_name is deprecated and will be removed in 1.4",
  152. DeprecationWarning)
  153. return self._task_name