canvas.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. """
  2. celery.canvas
  3. ~~~~~~~~~~~~~
  4. Designing task workflows.
  5. :copyright: (c) 2009 - 2012 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. from __future__ import absolute_import
  9. from itertools import chain as _chain
  10. from kombu.utils import kwdict, reprcall
  11. from celery import current_app
  12. from celery.local import Proxy
  13. from celery.utils import cached_property, uuid
  14. from celery.utils.functional import maybe_list
  15. from celery.utils.compat import chain_from_iterable
  16. Chord = Proxy(lambda: current_app.tasks["celery.chord"])
  17. class _getitem_property(object):
  18. def __init__(self, key):
  19. self.key = key
  20. def __get__(self, obj, type=None):
  21. if obj is None:
  22. return type
  23. return obj[self.key]
  24. def __set__(self, obj, value):
  25. obj[self.key] = value
  26. class Signature(dict):
  27. """Class that wraps the arguments and execution options
  28. for a single task invocation.
  29. Used as the parts in a :class:`group` or to safely
  30. pass tasks around as callbacks.
  31. :param task: Either a task class/instance, or the name of a task.
  32. :keyword args: Positional arguments to apply.
  33. :keyword kwargs: Keyword arguments to apply.
  34. :keyword options: Additional options to :meth:`Task.apply_async`.
  35. Note that if the first argument is a :class:`dict`, the other
  36. arguments will be ignored and the values in the dict will be used
  37. instead.
  38. >>> s = subtask("tasks.add", args=(2, 2))
  39. >>> subtask(s)
  40. {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
  41. """
  42. TYPES = {}
  43. _type = None
  44. @classmethod
  45. def register_type(cls, subclass, name=None):
  46. cls.TYPES[name or subclass.__name__] = subclass
  47. return subclass
  48. @classmethod
  49. def from_dict(self, d):
  50. typ = d.get("subtask_type")
  51. if typ:
  52. return self.TYPES[typ].from_dict(d)
  53. return Signature(d)
  54. def __init__(self, task=None, args=None, kwargs=None, options=None,
  55. type=None, subtask_type=None, **ex):
  56. init = dict.__init__
  57. if isinstance(task, dict):
  58. return init(self, task) # works like dict(d)
  59. # Also supports using task class/instance instead of string name.
  60. try:
  61. task_name = task.name
  62. except AttributeError:
  63. task_name = task
  64. else:
  65. self._type = task
  66. init(self, task=task_name, args=tuple(args or ()),
  67. kwargs=kwargs or {},
  68. options=dict(options or {}, **ex),
  69. subtask_type=subtask_type)
  70. def delay(self, *argmerge, **kwmerge):
  71. """Shortcut to `apply_async(argmerge, kwargs)`."""
  72. return self.apply_async(args=argmerge, kwargs=kwmerge)
  73. def apply(self, args=(), kwargs={}, **options):
  74. """Apply this task locally."""
  75. # For callbacks: extra args are prepended to the stored args.
  76. args, kwargs, options = self._merge(args, kwargs, options)
  77. return self.type.apply(args, kwargs, **options)
  78. def _merge(self, args=(), kwargs={}, options={}):
  79. return (tuple(args) + tuple(self.args),
  80. dict(self.kwargs, **kwargs),
  81. dict(self.options, **options))
  82. def clone(self, args=(), kwargs={}, **options):
  83. args, kwargs, options = self._merge(args, kwargs, options)
  84. s = self.from_dict({"task": self.task, "args": args,
  85. "kwargs": kwargs, "options": options,
  86. "subtask_type": self.subtask_type})
  87. s._type = self._type
  88. return s
  89. partial = clone
  90. def replace(self, args=None, kwargs=None, options=None):
  91. s = self.clone()
  92. if args is not None:
  93. s.args = args
  94. if kwargs is not None:
  95. s.kwargs = kwargs
  96. if options is not None:
  97. s.options = options
  98. return s
  99. def set(self, **options):
  100. self.options.update(options)
  101. return self
  102. def apply_async(self, args=(), kwargs={}, **options):
  103. """Apply this task asynchronously."""
  104. # For callbacks: extra args are prepended to the stored args.
  105. args, kwargs, options = self._merge(args, kwargs, options)
  106. return self.type.apply_async(args, kwargs, **options)
  107. def link(self, callback):
  108. """Add a callback task to be applied if this task
  109. executes successfully."""
  110. linked = self.options.setdefault("link", [])
  111. if callback not in linked:
  112. linked.append(callback)
  113. return callback
  114. def link_error(self, errback):
  115. """Add a callback task to be applied if an error occurs
  116. while executing this task."""
  117. linked = self.options.setdefault("link_error", [])
  118. if errback not in linked:
  119. linked.append(errback)
  120. return errback
  121. def flatten_links(self):
  122. """Gives a recursive list of dependencies (unchain if you will,
  123. but with links intact)."""
  124. return list(chain_from_iterable(_chain([[self]],
  125. (link.flatten_links()
  126. for link in maybe_list(self.options.get("link")) or []))))
  127. def __or__(self, other):
  128. if isinstance(other, chain):
  129. return chain(self.tasks + other.tasks)
  130. elif isinstance(other, Signature):
  131. return chain(self, other)
  132. return NotImplementedError
  133. def __invert__(self):
  134. return self.apply_async().get()
  135. def __reduce__(self):
  136. # for serialization, the task type is lazily loaded,
  137. # and not stored in the dict itself.
  138. return subtask, (dict(self), )
  139. def reprcall(self, *args, **kwargs):
  140. args, kwargs, _ = self._merge(args, kwargs, {})
  141. return reprcall(self["task"], args, kwargs)
  142. def __repr__(self):
  143. return self.reprcall()
  144. @cached_property
  145. def type(self):
  146. return self._type or current_app.tasks[self.task]
  147. task = _getitem_property("task")
  148. args = _getitem_property("args")
  149. kwargs = _getitem_property("kwargs")
  150. options = _getitem_property("options")
  151. subtask_type = _getitem_property("subtask_type")
  152. class chain(Signature):
  153. def __init__(self, *tasks, **options):
  154. Signature.__init__(self, "celery.chain", (), {"tasks": tasks}, options)
  155. self.tasks = tasks
  156. self.subtask_type = "chain"
  157. @classmethod
  158. def from_dict(self, d):
  159. return chain(*d["kwargs"]["tasks"], **kwdict(d["options"]))
  160. def __repr__(self):
  161. return " | ".join(map(repr, self.tasks))
  162. Signature.register_type(chain)
  163. class group(Signature):
  164. def __init__(self, tasks, **options):
  165. self.tasks = tasks = [maybe_subtask(t) for t in tasks]
  166. Signature.__init__(self, "celery.group", (), {"tasks": tasks}, options)
  167. self.subtask_type = "group"
  168. @classmethod
  169. def from_dict(self, d):
  170. return group(d["kwargs"]["tasks"], **kwdict(d["options"]))
  171. def __call__(self, **options):
  172. tasks, result = self.type.prepare(options,
  173. map(Signature.clone, self.tasks))
  174. return self.type(tasks, result)
  175. def __repr__(self):
  176. return repr(self.tasks)
  177. Signature.register_type(group)
  178. class chord(Signature):
  179. Chord = Chord
  180. def __init__(self, header, body=None, **options):
  181. Signature.__init__(self, "celery.chord", (),
  182. {"header": list(header),
  183. "body": maybe_subtask(body)}, options)
  184. self.subtask_type = "chord"
  185. @classmethod
  186. def from_dict(self, d):
  187. kwargs = d["kwargs"]
  188. return chord(kwargs["header"], kwargs["body"], **kwdict(d["options"]))
  189. def __call__(self, body=None, **options):
  190. _chord = self.Chord
  191. self.kwargs["body"] = body or self.kwargs["body"]
  192. if _chord.app.conf.CELERY_ALWAYS_EAGER:
  193. return _chord.apply((), self.kwargs)
  194. callback_id = body.options.setdefault("task_id", uuid())
  195. _chord(**self.kwargs)
  196. return _chord.AsyncResult(callback_id)
  197. def clone(self, *args, **kwargs):
  198. s = Signature.clone(self, *args, **kwargs)
  199. # need make copy of body
  200. try:
  201. kwargs["body"] = kwargs["body"].clone()
  202. except KeyError:
  203. pass
  204. return s
  205. def link(self, callback):
  206. self.body.link(callback)
  207. return callback
  208. def link_error(self, errback):
  209. self.body.link_error(errback)
  210. return errback
  211. def __repr__(self):
  212. if self.body:
  213. return self.body.reprcall(self.tasks)
  214. return "<chord without body: %r>" % (self.tasks, )
  215. @property
  216. def tasks(self):
  217. return self.kwargs["header"]
  218. @property
  219. def body(self):
  220. return self.kwargs["body"]
  221. Signature.register_type(chord)
  222. def subtask(varies, *args, **kwargs):
  223. if not (args or kwargs) and isinstance(varies, dict):
  224. if isinstance(varies, Signature):
  225. return varies.clone()
  226. return Signature.from_dict(varies)
  227. return Signature(varies, *args, **kwargs)
  228. def maybe_subtask(d):
  229. return subtask(d) if d is not None and not isinstance(d, Signature) else d