canvas.py 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249
  1. # -*- coding: utf-8 -*-
  2. """Composing task work-flows.
  3. .. seealso:
  4. You should import these from :mod:`celery` and not this module.
  5. """
  6. from collections import MutableSequence, deque
  7. from copy import deepcopy
  8. from functools import partial as _partial, reduce
  9. from operator import itemgetter
  10. from itertools import chain as _chain
  11. from kombu.utils import cached_property, fxrange, reprcall, uuid
  12. from vine import barrier
  13. from celery._state import current_app
  14. from celery.result import GroupResult
  15. from celery.utils import abstract
  16. from celery.utils.functional import (
  17. maybe_list, is_list, _regen, regen, chunks as _chunks,
  18. )
  19. from celery.utils.text import truncate
  20. __all__ = [
  21. 'Signature', 'chain', 'xmap', 'xstarmap', 'chunks',
  22. 'group', 'chord', 'signature', 'maybe_signature',
  23. ]
  24. class _getitem_property:
  25. """Attribute -> dict key descriptor.
  26. The target object must support ``__getitem__``,
  27. and optionally ``__setitem__``.
  28. Example:
  29. >>> from collections import defaultdict
  30. >>> class Me(dict):
  31. ... deep = defaultdict(dict)
  32. ...
  33. ... foo = _getitem_property('foo')
  34. ... deep_thing = _getitem_property('deep.thing')
  35. >>> me = Me()
  36. >>> me.foo
  37. None
  38. >>> me.foo = 10
  39. >>> me.foo
  40. 10
  41. >>> me['foo']
  42. 10
  43. >>> me.deep_thing = 42
  44. >>> me.deep_thing
  45. 42
  46. >>> me.deep
  47. defaultdict(<type 'dict'>, {'thing': 42})
  48. """
  49. def __init__(self, keypath, doc=None):
  50. path, _, self.key = keypath.rpartition('.')
  51. self.path = path.split('.') if path else None
  52. self.__doc__ = doc
  53. def _path(self, obj):
  54. return (reduce(lambda d, k: d[k], [obj] + self.path) if self.path
  55. else obj)
  56. def __get__(self, obj, type=None):
  57. if obj is None:
  58. return type
  59. return self._path(obj).get(self.key)
  60. def __set__(self, obj, value):
  61. self._path(obj)[self.key] = value
  62. def maybe_unroll_group(g):
  63. """Unroll group with only one member."""
  64. # Issue #1656
  65. try:
  66. size = len(g.tasks)
  67. except TypeError:
  68. try:
  69. size = g.tasks.__length_hint__()
  70. except (AttributeError, TypeError):
  71. return g
  72. else:
  73. return list(g.tasks)[0] if size == 1 else g
  74. else:
  75. return g.tasks[0] if size == 1 else g
  76. def task_name_from(task):
  77. return getattr(task, 'name', task)
  78. def _upgrade(fields, sig):
  79. """Used by custom signatures in .from_dict, to keep common fields."""
  80. sig.update(chord_size=fields.get('chord_size'))
  81. return sig
  82. @abstract.CallableSignature.register
  83. class Signature(dict):
  84. """Class that wraps the arguments and execution options
  85. for a single task invocation.
  86. Used as the parts in a :class:`group` and other constructs,
  87. or to pass tasks around as callbacks while being compatible
  88. with serializers with a strict type subset.
  89. Signatures can also be created from tasks:
  90. - Using the ``.signature()`` method which has the same signature
  91. as ``Task.apply_async``:
  92. .. code-block:: pycon
  93. >>> add.signature(args=(1,), kwargs={'kw': 2}, options={})
  94. - or the ``.s()`` shortcut that works for star arguments:
  95. .. code-block:: pycon
  96. >>> add.s(1, kw=2)
  97. - the ``.s()`` shortcut does not allow you to specify execution options
  98. but there's a chaning `.set` method that returns the signature:
  99. .. code-block:: pycon
  100. >>> add.s(2, 2).set(countdown=10).set(expires=30).delay()
  101. Note:
  102. You should use :func:`~celery.signature` to create new signatures.
  103. The ``Signature`` class is the type returned by that function and
  104. should be used for ``isinstance`` checks for signatures.
  105. See Also:
  106. :ref:`guide-canvas` for the complete guide.
  107. Arguments:
  108. task (Task, str): Either a task class/instance, or the name of a task.
  109. args (Tuple): Positional arguments to apply.
  110. kwargs (Dict): Keyword arguments to apply.
  111. options (Dict): Additional options to :meth:`Task.apply_async`.
  112. Note:
  113. If the first argument is a :class:`dict`, the other
  114. arguments will be ignored and the values in the dict will be used
  115. instead::
  116. >>> s = signature('tasks.add', args=(2, 2))
  117. >>> signature(s)
  118. {'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
  119. """
  120. TYPES = {}
  121. _app = _type = None
  122. @classmethod
  123. def register_type(cls, subclass, name=None):
  124. cls.TYPES[name or subclass.__name__] = subclass
  125. return subclass
  126. @classmethod
  127. def from_dict(cls, d, app=None):
  128. typ = d.get('subtask_type')
  129. if typ:
  130. target_cls = cls.TYPES[typ]
  131. if target_cls is not cls:
  132. return target_cls.from_dict(d, app=app)
  133. return Signature(d, app=app)
  134. def __init__(self, task=None, args=None, kwargs=None, options=None,
  135. type=None, subtask_type=None, immutable=False,
  136. app=None, **ex):
  137. self._app = app
  138. init = dict.__init__
  139. if isinstance(task, dict):
  140. return init(self, task) # works like dict(d)
  141. # Also supports using task class/instance instead of string name.
  142. try:
  143. task_name = task.name
  144. except AttributeError:
  145. task_name = task
  146. else:
  147. self._type = task
  148. init(self,
  149. task=task_name, args=tuple(args or ()),
  150. kwargs=kwargs or {},
  151. options=dict(options or {}, **ex),
  152. subtask_type=subtask_type,
  153. immutable=immutable,
  154. chord_size=None)
  155. def __call__(self, *partial_args, **partial_kwargs):
  156. """Call the task directly (in the current process)."""
  157. args, kwargs, _ = self._merge(partial_args, partial_kwargs, None)
  158. return self.type(*args, **kwargs)
  159. def delay(self, *partial_args, **partial_kwargs):
  160. """Shortcut to :meth:`apply_async` using star arguments."""
  161. return self.apply_async(partial_args, partial_kwargs)
  162. def apply(self, args=(), kwargs={}, **options):
  163. """Same as :meth:`apply_async` but executed the task inline instead
  164. of sending a task message."""
  165. # For callbacks: extra args are prepended to the stored args.
  166. args, kwargs, options = self._merge(args, kwargs, options)
  167. return self.type.apply(args, kwargs, **options)
  168. def apply_async(self, args=(), kwargs={}, route_name=None, **options):
  169. """Apply this task asynchronously.
  170. Arguments:
  171. args (Tuple): Partial args to be prepended to the existing args.
  172. kwargs (Dict): Partial kwargs to be merged with existing kwargs.
  173. options (Dict): Partial options to be merged
  174. with existing options.
  175. Returns:
  176. ~@AsyncResult: promise of future evaluation.
  177. See also:
  178. :meth:`~@Task.apply_async` and the :ref:`guide-calling` guide.
  179. """
  180. try:
  181. _apply = self._apply_async
  182. except IndexError: # pragma: no cover
  183. # no tasks for chain, etc to find type
  184. return
  185. # For callbacks: extra args are prepended to the stored args.
  186. if args or kwargs or options:
  187. args, kwargs, options = self._merge(args, kwargs, options)
  188. else:
  189. args, kwargs, options = self.args, self.kwargs, self.options
  190. return _apply(args, kwargs, **options)
  191. def _merge(self, args=(), kwargs={}, options={}, force=False):
  192. if self.immutable and not force:
  193. return (self.args, self.kwargs,
  194. dict(self.options, **options) if options else self.options)
  195. return (tuple(args) + tuple(self.args) if args else self.args,
  196. dict(self.kwargs, **kwargs) if kwargs else self.kwargs,
  197. dict(self.options, **options) if options else self.options)
  198. def clone(self, args=(), kwargs={}, **opts):
  199. """Create a copy of this signature.
  200. Arguments:
  201. args (Tuple): Partial args to be prepended to the existing args.
  202. kwargs (Dict): Partial kwargs to be merged with existing kwargs.
  203. options (Dict): Partial options to be merged with
  204. existing options.
  205. """
  206. # need to deepcopy options so origins links etc. is not modified.
  207. if args or kwargs or opts:
  208. args, kwargs, opts = self._merge(args, kwargs, opts)
  209. else:
  210. args, kwargs, opts = self.args, self.kwargs, self.options
  211. s = Signature.from_dict({'task': self.task, 'args': tuple(args),
  212. 'kwargs': kwargs, 'options': deepcopy(opts),
  213. 'subtask_type': self.subtask_type,
  214. 'chord_size': self.chord_size,
  215. 'immutable': self.immutable}, app=self._app)
  216. s._type = self._type
  217. return s
  218. partial = clone
  219. def freeze(self, _id=None, group_id=None, chord=None,
  220. root_id=None, parent_id=None):
  221. """Finalize the signature by adding a concrete task id.
  222. The task will not be called and you should not call the signature
  223. twice after freezing it as that will result in two task messages
  224. using the same task id.
  225. Returns:
  226. ~@AsyncResult: promise of future evaluation.
  227. """
  228. opts = self.options
  229. try:
  230. tid = opts['task_id']
  231. except KeyError:
  232. tid = opts['task_id'] = _id or uuid()
  233. if root_id:
  234. opts['root_id'] = root_id
  235. if parent_id:
  236. opts['parent_id'] = parent_id
  237. if 'reply_to' not in opts:
  238. opts['reply_to'] = self.app.oid
  239. if group_id:
  240. opts['group_id'] = group_id
  241. if chord:
  242. opts['chord'] = chord
  243. return self.AsyncResult(tid)
  244. _freeze = freeze
  245. def replace(self, args=None, kwargs=None, options=None):
  246. """Replace the args, kwargs or options set for this signature.
  247. These are only replaced if the argument for the section is
  248. not :const:`None`."""
  249. s = self.clone()
  250. if args is not None:
  251. s.args = args
  252. if kwargs is not None:
  253. s.kwargs = kwargs
  254. if options is not None:
  255. s.options = options
  256. return s
  257. def set(self, immutable=None, **options):
  258. """Set arbitrary execution options (same as ``.options.update(…)``).
  259. Returns:
  260. Signature: This is a chaining method call
  261. (i.e. it will return ``self``).
  262. """
  263. if immutable is not None:
  264. self.set_immutable(immutable)
  265. self.options.update(options)
  266. return self
  267. def set_immutable(self, immutable):
  268. self.immutable = immutable
  269. def set_parent_id(self, parent_id):
  270. self.parent_id = parent_id
  271. def _with_list_option(self, key):
  272. items = self.options.setdefault(key, [])
  273. if not isinstance(items, MutableSequence):
  274. items = self.options[key] = [items]
  275. return items
  276. def append_to_list_option(self, key, value):
  277. items = self._with_list_option(key)
  278. if value not in items:
  279. items.append(value)
  280. return value
  281. def extend_list_option(self, key, value):
  282. items = self._with_list_option(key)
  283. items.extend(maybe_list(value))
  284. def link(self, callback):
  285. """Add a callback task to be applied if this task
  286. executes successfully.
  287. Returns:
  288. Signature: the argument passed, for chaining
  289. or use with :func:`~functools.reduce`.
  290. """
  291. return self.append_to_list_option('link', callback)
  292. def link_error(self, errback):
  293. """Add a callback task to be applied if an error occurs
  294. while executing this task.
  295. Returns:
  296. Signature: the argument passed, for chaining
  297. or use with :func:`~functools.reduce`.
  298. """
  299. return self.append_to_list_option('link_error', errback)
  300. def on_error(self, errback):
  301. """Version of :meth:`link_error` that supports chaining.
  302. on_error chains the original signature, not the errback so::
  303. >>> add.s(2, 2).on_error(errback.s()).delay()
  304. calls the ``add`` task, not the ``errback`` task, but the
  305. reverse is true for :meth:`link_error`.
  306. """
  307. self.link_error(errback)
  308. return self
  309. def flatten_links(self):
  310. """Return a recursive list of dependencies (unchain if you will,
  311. but with links intact)."""
  312. return list(_chain.from_iterable(_chain(
  313. [[self]],
  314. (link.flatten_links()
  315. for link in maybe_list(self.options.get('link')) or [])
  316. )))
  317. def __or__(self, other):
  318. if isinstance(self, group):
  319. if isinstance(other, group):
  320. return group(_chain(self.tasks, other.tasks), app=self.app)
  321. return chord(self, body=other, app=self._app)
  322. elif isinstance(other, group):
  323. other = maybe_unroll_group(other)
  324. if not isinstance(self, chain) and isinstance(other, chain):
  325. return chain((self,) + other.tasks, app=self._app)
  326. elif isinstance(other, chain):
  327. return chain(*self.tasks + other.tasks, app=self._app)
  328. elif isinstance(other, Signature):
  329. if isinstance(self, chain):
  330. return chain(*self.tasks + (other,), app=self._app)
  331. return chain(self, other, app=self._app)
  332. return NotImplemented
  333. def __deepcopy__(self, memo):
  334. memo[id(self)] = self
  335. return dict(self)
  336. def __invert__(self):
  337. return self.apply_async().get()
  338. def __reduce__(self):
  339. # for serialization, the task type is lazily loaded,
  340. # and not stored in the dict itself.
  341. return signature, (dict(self),)
  342. def __json__(self):
  343. return dict(self)
  344. def reprcall(self, *args, **kwargs):
  345. args, kwargs, _ = self._merge(args, kwargs, {}, force=True)
  346. return reprcall(self['task'], args, kwargs)
  347. def election(self):
  348. type = self.type
  349. app = type.app
  350. tid = self.options.get('task_id') or uuid()
  351. with app.producer_or_acquire(None) as P:
  352. props = type.backend.on_task_call(P, tid)
  353. app.control.election(tid, 'task', self.clone(task_id=tid, **props),
  354. connection=P.connection)
  355. return type.AsyncResult(tid)
  356. def __repr__(self):
  357. return self.reprcall()
  358. @property
  359. def name(self):
  360. # for duck typing compatibility with Task.name
  361. return self.task
  362. @cached_property
  363. def type(self):
  364. return self._type or self.app.tasks[self['task']]
  365. @cached_property
  366. def app(self):
  367. return self._app or current_app
  368. @cached_property
  369. def AsyncResult(self):
  370. try:
  371. return self.type.AsyncResult
  372. except KeyError: # task not registered
  373. return self.app.AsyncResult
  374. @cached_property
  375. def _apply_async(self):
  376. try:
  377. return self.type.apply_async
  378. except KeyError:
  379. return _partial(self.app.send_task, self['task'])
  380. id = _getitem_property('options.task_id', 'Task UUID')
  381. parent_id = _getitem_property('options.parent_id', 'Task parent UUID.')
  382. root_id = _getitem_property('options.root_id', 'Task root UUID.')
  383. task = _getitem_property('task', 'Name of task.')
  384. args = _getitem_property('args', 'Positional arguments to task.')
  385. kwargs = _getitem_property('kwargs', 'Keyword arguments to task.')
  386. options = _getitem_property('options', 'Task execution options.')
  387. subtask_type = _getitem_property('subtask_type', 'Type of signature')
  388. chord_size = _getitem_property(
  389. 'chord_size', 'Size of chord (if applicable)')
  390. immutable = _getitem_property(
  391. 'immutable', 'Flag set if no longer accepts new arguments')
  392. @Signature.register_type
  393. class chain(Signature):
  394. """Chains tasks together, so that each tasks follows each other
  395. by being applied as a callback of the previous task.
  396. Note:
  397. If called with only one argument, then that argument must
  398. be an iterable of tasks to chain, which means you can
  399. use this with a generator expression.
  400. Example:
  401. This is effectively :math:`((2 + 2) + 4)`:
  402. .. code-block:: pycon
  403. >>> res = chain(add.s(2, 2), add.s(4))()
  404. >>> res.get()
  405. 8
  406. Calling a chain will return the result of the last task in the chain.
  407. You can get to the other tasks by following the ``result.parent``'s:
  408. .. code-block:: pycon
  409. >>> res.parent.get()
  410. 4
  411. Using a generator expression:
  412. .. code-block:: pycon
  413. >>> lazy_chain = chain(add.s(i) for i in range(10))
  414. >>> res = lazy_chain(3)
  415. Arguments:
  416. *tasks (Signature): List of task signatures to chain.
  417. If only one argument is passed and that argument is
  418. an iterable, then that will be used as the list of signatures
  419. to chain instead. This means that you can use a generator
  420. expression.
  421. Returns:
  422. ~celery.chain: A lazy signature that can be called to apply the first
  423. task in the chain. When that task succeeed the next task in the
  424. chain is applied, and so on.
  425. """
  426. tasks = _getitem_property('kwargs.tasks', 'Tasks in chain.')
  427. def __init__(self, *tasks, **options):
  428. tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
  429. else tasks)
  430. Signature.__init__(
  431. self, 'celery.chain', (), {'tasks': tasks}, **options
  432. )
  433. self._use_link = options.pop('use_link', None)
  434. self.subtask_type = 'chain'
  435. self._frozen = None
  436. def __call__(self, *args, **kwargs):
  437. if self.tasks:
  438. return self.apply_async(args, kwargs)
  439. def clone(self, *args, **kwargs):
  440. s = Signature.clone(self, *args, **kwargs)
  441. s.kwargs['tasks'] = [sig.clone() for sig in s.kwargs['tasks']]
  442. return s
  443. def apply_async(self, args=(), kwargs={}, **options):
  444. # python is best at unpacking kwargs, so .run is here to do that.
  445. app = self.app
  446. if app.conf.task_always_eager:
  447. return self.apply(args, kwargs, **options)
  448. return self.run(args, kwargs, app=app, **(
  449. dict(self.options, **options) if options else self.options))
  450. def run(self, args=(), kwargs={}, group_id=None, chord=None,
  451. task_id=None, link=None, link_error=None, publisher=None,
  452. producer=None, root_id=None, parent_id=None, app=None, **options):
  453. app = app or self.app
  454. use_link = self._use_link
  455. if use_link is None and app.conf.task_protocol == 1:
  456. use_link = True
  457. args = (tuple(args) + tuple(self.args)
  458. if args and not self.immutable else self.args)
  459. if self._frozen:
  460. tasks, results = self._frozen
  461. else:
  462. tasks, results = self.prepare_steps(
  463. args, self.tasks, root_id, parent_id, link_error, app,
  464. task_id, group_id, chord,
  465. )
  466. if results:
  467. if link:
  468. tasks[0].extend_list_option('link', link)
  469. first_task = tasks.pop()
  470. first_task.apply_async(
  471. chain=tasks if not use_link else None, **options)
  472. return results[0]
  473. def freeze(self, _id=None, group_id=None, chord=None,
  474. root_id=None, parent_id=None):
  475. _, results = self._frozen = self.prepare_steps(
  476. self.args, self.tasks, root_id, parent_id, None,
  477. self.app, _id, group_id, chord, clone=False,
  478. )
  479. return results[0]
  480. def prepare_steps(self, args, tasks,
  481. root_id=None, parent_id=None, link_error=None, app=None,
  482. last_task_id=None, group_id=None, chord_body=None,
  483. clone=True, from_dict=Signature.from_dict):
  484. app = app or self.app
  485. # use chain message field for protocol 2 and later.
  486. # this avoids pickle blowing the stack on the recursion
  487. # required by linking task together in a tree structure.
  488. # (why is pickle using recursion? or better yet why cannot python
  489. # do tail call optimization making recursion actually useful?)
  490. use_link = self._use_link
  491. if use_link is None and app.conf.task_protocol == 1:
  492. use_link = True
  493. steps = deque(tasks)
  494. steps_pop = steps.pop
  495. steps_extend = steps.extend
  496. prev_task = None
  497. prev_res = prev_prev_res = None
  498. tasks, results = [], []
  499. i = 0
  500. while steps:
  501. task = steps_pop()
  502. is_first_task, is_last_task = not steps, not i
  503. if not isinstance(task, abstract.CallableSignature):
  504. task = from_dict(task, app=app)
  505. if isinstance(task, group):
  506. task = maybe_unroll_group(task)
  507. # first task gets partial args from chain
  508. if clone:
  509. task = task.clone(args) if is_first_task else task.clone()
  510. elif is_first_task:
  511. task.args = tuple(args) + tuple(task.args)
  512. if isinstance(task, chain):
  513. # splice the chain
  514. steps_extend(task.tasks)
  515. continue
  516. if isinstance(task, group) and prev_task:
  517. # automatically upgrade group(...) | s to chord(group, s)
  518. # for chords we freeze by pretending it's a normal
  519. # signature instead of a group.
  520. tasks.pop()
  521. results.pop()
  522. task = chord(
  523. task, body=prev_task,
  524. task_id=prev_res.id, root_id=root_id, app=app,
  525. )
  526. prev_res = prev_prev_res
  527. if is_last_task:
  528. # chain(task_id=id) means task id is set for the last task
  529. # in the chain. If the chord is part of a chord/group
  530. # then that chord/group must synchronize based on the
  531. # last task in the chain, so we only set the group_id and
  532. # chord callback for the last task.
  533. res = task.freeze(
  534. last_task_id,
  535. root_id=root_id, group_id=group_id, chord=chord_body,
  536. )
  537. else:
  538. res = task.freeze(root_id=root_id)
  539. i += 1
  540. if prev_task:
  541. prev_task.set_parent_id(task.id)
  542. if use_link:
  543. # link previous task to this task.
  544. task.link(prev_task)
  545. if prev_res:
  546. prev_res.parent = res
  547. if is_first_task and parent_id is not None:
  548. task.set_parent_id(parent_id)
  549. if link_error:
  550. for errback in maybe_list(link_error):
  551. task.link_error(errback)
  552. tasks.append(task)
  553. results.append(res)
  554. prev_task, prev_prev_res, prev_res = (
  555. task, prev_res, res,
  556. )
  557. if root_id is None and tasks:
  558. root_id = tasks[-1].id
  559. for task in reversed(tasks):
  560. task.options['root_id'] = root_id
  561. return tasks, results
  562. def apply(self, args=(), kwargs={}, **options):
  563. last, fargs = None, args
  564. for task in self.tasks:
  565. res = task.clone(fargs).apply(
  566. last and (last.get(),), **dict(self.options, **options))
  567. res.parent, last, fargs = last, res, None
  568. return last
  569. @classmethod
  570. def from_dict(self, d, app=None):
  571. tasks = d['kwargs']['tasks']
  572. if tasks:
  573. if isinstance(tasks, tuple): # aaaargh
  574. tasks = d['kwargs']['tasks'] = list(tasks)
  575. # First task must be signature object to get app
  576. tasks[0] = maybe_signature(tasks[0], app=app)
  577. return _upgrade(d, chain(*tasks, app=app, **d['options']))
  578. @property
  579. def app(self):
  580. app = self._app
  581. if app is None:
  582. try:
  583. app = self.tasks[0]._app
  584. except LookupError:
  585. pass
  586. return app or current_app
  587. def __repr__(self):
  588. return ' | '.join(repr(t) for t in self.tasks)
  589. class _basemap(Signature):
  590. _task_name = None
  591. _unpack_args = itemgetter('task', 'it')
  592. def __init__(self, task, it, **options):
  593. Signature.__init__(
  594. self, self._task_name, (),
  595. {'task': task, 'it': regen(it)}, immutable=True, **options
  596. )
  597. def apply_async(self, args=(), kwargs={}, **opts):
  598. # need to evaluate generators
  599. task, it = self._unpack_args(self.kwargs)
  600. return self.type.apply_async(
  601. (), {'task': task, 'it': list(it)},
  602. route_name=task_name_from(self.kwargs.get('task')), **opts
  603. )
  604. @classmethod
  605. def from_dict(cls, d, app=None):
  606. return _upgrade(
  607. d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
  608. )
  609. @Signature.register_type
  610. class xmap(_basemap):
  611. _task_name = 'celery.map'
  612. def __repr__(self):
  613. task, it = self._unpack_args(self.kwargs)
  614. return '[{0}(x) for x in {1}]'.format(task.task,
  615. truncate(repr(it), 100))
  616. @Signature.register_type
  617. class xstarmap(_basemap):
  618. _task_name = 'celery.starmap'
  619. def __repr__(self):
  620. task, it = self._unpack_args(self.kwargs)
  621. return '[{0}(*x) for x in {1}]'.format(task.task,
  622. truncate(repr(it), 100))
  623. @Signature.register_type
  624. class chunks(Signature):
  625. _unpack_args = itemgetter('task', 'it', 'n')
  626. def __init__(self, task, it, n, **options):
  627. Signature.__init__(
  628. self, 'celery.chunks', (),
  629. {'task': task, 'it': regen(it), 'n': n},
  630. immutable=True, **options
  631. )
  632. @classmethod
  633. def from_dict(self, d, app=None):
  634. return _upgrade(
  635. d, chunks(*self._unpack_args(
  636. d['kwargs']), app=app, **d['options']),
  637. )
  638. def apply_async(self, args=(), kwargs={}, **opts):
  639. return self.group().apply_async(
  640. args, kwargs,
  641. route_name=task_name_from(self.kwargs.get('task')), **opts
  642. )
  643. def __call__(self, **options):
  644. return self.apply_async(**options)
  645. def group(self):
  646. # need to evaluate generators
  647. task, it, n = self._unpack_args(self.kwargs)
  648. return group((xstarmap(task, part, app=self._app)
  649. for part in _chunks(iter(it), n)),
  650. app=self._app)
  651. @classmethod
  652. def apply_chunks(cls, task, it, n, app=None):
  653. return cls(task, it, n, app=app)()
  654. def _maybe_group(tasks, app):
  655. if isinstance(tasks, dict):
  656. tasks = signature(tasks, app=app)
  657. if isinstance(tasks, group):
  658. tasks = tasks.tasks
  659. elif isinstance(tasks, abstract.CallableSignature):
  660. tasks = [tasks]
  661. else:
  662. tasks = [signature(t, app=app) for t in tasks]
  663. return tasks
  664. @Signature.register_type
  665. class group(Signature):
  666. """Creates a group of tasks to be executed in parallel.
  667. A group is lazy so you must call it to take action and evaluate
  668. the group.
  669. Note:
  670. If only one argument is passed, and that argument is an iterable
  671. then that will be used as the list of tasks instead, which
  672. means you can use ``group`` with generator expressions.
  673. Example:
  674. >>> lazy_group = group([add.s(2, 2), add.s(4, 4)])
  675. >>> promise = lazy_group() # <-- evaluate: returns lazy result.
  676. >>> promise.get() # <-- will wait for the task to return
  677. [4, 8]
  678. Arguments:
  679. *tasks (Signature): A list of signatures that this group will call.
  680. If there is only one argument, and that argument is an iterable,
  681. then that will define the list of signatures instead.
  682. **options (Any): Execution options applied to all tasks
  683. in the group.
  684. Returns:
  685. ~celery.group: signature that when called will then call all of the
  686. tasks in the group (and return a :class:`GroupResult` instance
  687. that can be used to inspect the state of the group).
  688. """
  689. tasks = _getitem_property('kwargs.tasks', 'Tasks in group.')
  690. def __init__(self, *tasks, **options):
  691. if len(tasks) == 1:
  692. tasks = tasks[0]
  693. if isinstance(tasks, group):
  694. tasks = tasks.tasks
  695. if not isinstance(tasks, _regen):
  696. tasks = regen(tasks)
  697. Signature.__init__(
  698. self, 'celery.group', (), {'tasks': tasks}, **options
  699. )
  700. self.subtask_type = 'group'
  701. @classmethod
  702. def from_dict(self, d, app=None):
  703. return _upgrade(
  704. d, group(d['kwargs']['tasks'], app=app, **d['options']),
  705. )
  706. def __len__(self):
  707. return len(self.tasks)
  708. def _prepared(self, tasks, partial_args, group_id, root_id, app,
  709. CallableSignature=abstract.CallableSignature,
  710. from_dict=Signature.from_dict,
  711. isinstance=isinstance, tuple=tuple):
  712. for task in tasks:
  713. if isinstance(task, CallableSignature):
  714. # local sigs are always of type Signature, and we
  715. # clone them to make sure we do not modify the originals.
  716. task = task.clone()
  717. else:
  718. # serialized sigs must be converted to Signature.
  719. task = from_dict(task, app=app)
  720. if isinstance(task, group):
  721. # needs yield_from :(
  722. unroll = task._prepared(
  723. task.tasks, partial_args, group_id, root_id, app,
  724. )
  725. for taskN, resN in unroll:
  726. yield taskN, resN
  727. else:
  728. if partial_args and not task.immutable:
  729. task.args = tuple(partial_args) + tuple(task.args)
  730. yield task, task.freeze(group_id=group_id, root_id=root_id)
  731. def _apply_tasks(self, tasks, producer=None, app=None, p=None,
  732. add_to_parent=None, chord=None, **options):
  733. app = app or self.app
  734. with app.producer_or_acquire(producer) as producer:
  735. for sig, res in tasks:
  736. sig.apply_async(producer=producer, add_to_parent=False,
  737. chord=sig.options.get('chord') or chord,
  738. **options)
  739. # adding callback to result, such that it will gradually
  740. # fulfill the barrier.
  741. #
  742. # Using barrier.add would use result.then, but we need
  743. # to add the weak argument here to only create a weak
  744. # reference to the object.
  745. if p and not p.cancelled and not p.ready:
  746. p.size += 1
  747. res.then(p, weak=True)
  748. yield res # <-- r.parent, etc set in the frozen result.
  749. def _freeze_gid(self, options):
  750. # remove task_id and use that as the group_id,
  751. # if we don't remove it then every task will have the same id...
  752. options = dict(self.options, **options)
  753. options['group_id'] = group_id = (
  754. options.pop('task_id', uuid()))
  755. return options, group_id, options.get('root_id')
  756. def set_parent_id(self, parent_id):
  757. for task in self.tasks:
  758. task.set_parent_id(parent_id)
  759. def apply_async(self, args=(), kwargs=None, add_to_parent=True,
  760. producer=None, **options):
  761. app = self.app
  762. if app.conf.task_always_eager:
  763. return self.apply(args, kwargs, **options)
  764. if not self.tasks:
  765. return self.freeze()
  766. options, group_id, root_id = self._freeze_gid(options)
  767. tasks = self._prepared(self.tasks, args, group_id, root_id, app)
  768. p = barrier()
  769. results = list(self._apply_tasks(tasks, producer, app, p, **options))
  770. result = self.app.GroupResult(group_id, results, ready_barrier=p)
  771. p.finalize()
  772. # - Special case of group(A.s() | group(B.s(), C.s()))
  773. # That is, group with single item that is a chain but the
  774. # last task in that chain is a group.
  775. #
  776. # We cannot actually support arbitrary GroupResults in chains,
  777. # but this special case we can.
  778. if len(result) == 1 and isinstance(result[0], GroupResult):
  779. result = result[0]
  780. parent_task = app.current_worker_task
  781. if add_to_parent and parent_task:
  782. parent_task.add_trail(result)
  783. return result
  784. def apply(self, args=(), kwargs={}, **options):
  785. app = self.app
  786. if not self.tasks:
  787. return self.freeze() # empty group returns GroupResult
  788. options, group_id, root_id = self._freeze_gid(options)
  789. tasks = self._prepared(self.tasks, args, group_id, root_id, app)
  790. return app.GroupResult(group_id, [
  791. sig.apply(**options) for sig, _ in tasks
  792. ])
  793. def set_immutable(self, immutable):
  794. for task in self.tasks:
  795. task.set_immutable(immutable)
  796. def link(self, sig):
  797. # Simply link to first task
  798. sig = sig.clone().set(immutable=True)
  799. return self.tasks[0].link(sig)
  800. def link_error(self, sig):
  801. sig = sig.clone().set(immutable=True)
  802. return self.tasks[0].link_error(sig)
  803. def __call__(self, *partial_args, **options):
  804. return self.apply_async(partial_args, **options)
  805. def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
  806. stack = deque(self.tasks)
  807. while stack:
  808. task = maybe_signature(stack.popleft(), app=self._app).clone()
  809. if isinstance(task, group):
  810. stack.extendleft(task.tasks)
  811. else:
  812. new_tasks.append(task)
  813. yield task.freeze(group_id=group_id,
  814. chord=chord, root_id=root_id,
  815. parent_id=parent_id)
  816. def freeze(self, _id=None, group_id=None, chord=None,
  817. root_id=None, parent_id=None):
  818. opts = self.options
  819. try:
  820. gid = opts['task_id']
  821. except KeyError:
  822. gid = opts['task_id'] = uuid()
  823. if group_id:
  824. opts['group_id'] = group_id
  825. if chord:
  826. opts['chord'] = chord
  827. root_id = opts.setdefault('root_id', root_id)
  828. parent_id = opts.setdefault('parent_id', parent_id)
  829. new_tasks = []
  830. # Need to unroll subgroups early so that chord gets the
  831. # right result instance for chord_unlock etc.
  832. results = list(self._freeze_unroll(
  833. new_tasks, group_id, chord, root_id, parent_id,
  834. ))
  835. if isinstance(self.tasks, MutableSequence):
  836. self.tasks[:] = new_tasks
  837. else:
  838. self.tasks = new_tasks
  839. return self.app.GroupResult(gid, results)
  840. _freeze = freeze
  841. def skew(self, start=1.0, stop=None, step=1.0):
  842. it = fxrange(start, stop, step, repeatlast=True)
  843. for task in self.tasks:
  844. task.set(countdown=next(it))
  845. return self
  846. def __iter__(self):
  847. return iter(self.tasks)
  848. def __repr__(self):
  849. return 'group({0.tasks!r})'.format(self)
  850. @property
  851. def app(self):
  852. app = self._app
  853. if app is None:
  854. try:
  855. app = self.tasks[0].app
  856. except LookupError:
  857. pass
  858. return app if app is not None else current_app
  859. @Signature.register_type
  860. class chord(Signature):
  861. """Barrier synchronization primitive.
  862. A chord consists of a header and a body.
  863. The header is a group of tasks that must complete before the callback is
  864. called. A chord is essentially a callback for a group of tasks.
  865. The body is applied with the return values of all the header
  866. tasks as a list.
  867. Example:
  868. The chrod:
  869. .. code-block:: pycon
  870. >>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())
  871. is effectively :math:`\Sigma ((2 + 2) + (4 + 4))`:
  872. .. code-block:: pycon
  873. >>> res.get()
  874. 12
  875. """
  876. def __init__(self, header, body=None, task='celery.chord',
  877. args=(), kwargs={}, app=None, **options):
  878. Signature.__init__(
  879. self, task, args,
  880. dict(kwargs, header=_maybe_group(header, app),
  881. body=maybe_signature(body, app=app)), app=app, **options
  882. )
  883. self.subtask_type = 'chord'
  884. def freeze(self, _id=None, group_id=None, chord=None,
  885. root_id=None, parent_id=None):
  886. if not isinstance(self.tasks, group):
  887. self.tasks = group(self.tasks, app=self.app)
  888. bodyres = self.body.freeze(_id, parent_id=self.id, root_id=root_id)
  889. self.tasks.freeze(
  890. parent_id=parent_id, root_id=root_id, chord=self.body)
  891. self.id = self.tasks.id
  892. self.body.set_parent_id(self.id)
  893. return bodyres
  894. def set_parent_id(self, parent_id):
  895. tasks = self.tasks
  896. if isinstance(tasks, group):
  897. tasks = tasks.tasks
  898. for task in tasks:
  899. task.set_parent_id(parent_id)
  900. self.parent_id = parent_id
  901. @classmethod
  902. def from_dict(self, d, app=None):
  903. args, d['kwargs'] = self._unpack_args(**d['kwargs'])
  904. return _upgrade(d, self(*args, app=app, **d))
  905. @staticmethod
  906. def _unpack_args(header=None, body=None, **kwargs):
  907. # Python signatures are better at extracting keys from dicts
  908. # than manually popping things off.
  909. return (header, body), kwargs
  910. @cached_property
  911. def app(self):
  912. return self._get_app(self.body)
  913. def _get_app(self, body=None):
  914. app = self._app
  915. if app is None:
  916. try:
  917. tasks = self.tasks.tasks # is a group
  918. except AttributeError:
  919. tasks = self.tasks
  920. app = tasks[0]._app
  921. if app is None and body is not None:
  922. app = body._app
  923. return app if app is not None else current_app
  924. def apply_async(self, args=(), kwargs={}, task_id=None,
  925. producer=None, publisher=None, connection=None,
  926. router=None, result_cls=None, **options):
  927. args = (tuple(args) + tuple(self.args)
  928. if args and not self.immutable else self.args)
  929. body = kwargs.get('body') or self.kwargs['body']
  930. kwargs = dict(self.kwargs, **kwargs)
  931. body = body.clone(**options)
  932. app = self._get_app(body)
  933. tasks = (self.tasks.clone() if isinstance(self.tasks, group)
  934. else group(self.tasks, app=app))
  935. if app.conf.task_always_eager:
  936. return self.apply(args, kwargs,
  937. body=body, task_id=task_id, **options)
  938. return self.run(tasks, body, args, task_id=task_id, **options)
  939. def apply(self, args=(), kwargs={}, propagate=True, body=None, **options):
  940. body = self.body if body is None else body
  941. tasks = (self.tasks.clone() if isinstance(self.tasks, group)
  942. else group(self.tasks, app=self.app))
  943. return body.apply(
  944. args=(tasks.apply(args, kwargs).get(propagate=propagate),),
  945. )
  946. def _traverse_tasks(self, tasks, value=None):
  947. stack = deque(list(tasks))
  948. while stack:
  949. task = stack.popleft()
  950. if isinstance(task, group):
  951. stack.extend(task.tasks)
  952. else:
  953. yield task if value is None else value
  954. def __length_hint__(self):
  955. return sum(self._traverse_tasks(self.tasks, 1))
  956. def run(self, header, body, partial_args, app=None, interval=None,
  957. countdown=1, max_retries=None, eager=False,
  958. task_id=None, **options):
  959. app = app or self._get_app(body)
  960. group_id = uuid()
  961. root_id = body.options.get('root_id')
  962. body.chord_size = self.__length_hint__()
  963. options = dict(self.options, **options) if options else self.options
  964. if options:
  965. options.pop('task_id', None)
  966. body.options.update(options)
  967. results = header.freeze(
  968. group_id=group_id, chord=body, root_id=root_id).results
  969. bodyres = body.freeze(task_id, root_id=root_id)
  970. parent = app.backend.apply_chord(
  971. header, partial_args, group_id, body,
  972. interval=interval, countdown=countdown,
  973. options=options, max_retries=max_retries,
  974. result=results)
  975. bodyres.parent = parent
  976. return bodyres
  977. def __call__(self, body=None, **options):
  978. return self.apply_async((), {'body': body} if body else {}, **options)
  979. def clone(self, *args, **kwargs):
  980. s = Signature.clone(self, *args, **kwargs)
  981. # need to make copy of body
  982. try:
  983. s.kwargs['body'] = s.kwargs['body'].clone()
  984. except (AttributeError, KeyError):
  985. pass
  986. return s
  987. def link(self, callback):
  988. self.body.link(callback)
  989. return callback
  990. def link_error(self, errback):
  991. self.body.link_error(errback)
  992. return errback
  993. def set_immutable(self, immutable):
  994. # changes mutability of header only, not callback.
  995. for task in self.tasks:
  996. task.set_immutable(immutable)
  997. def __repr__(self):
  998. if self.body:
  999. return self.body.reprcall(self.tasks)
  1000. return '<chord without body: {0.tasks!r}>'.format(self)
  1001. tasks = _getitem_property('kwargs.header', 'Tasks in chord header.')
  1002. body = _getitem_property('kwargs.body', 'Body task of chord.')
  1003. def signature(varies, *args, **kwargs):
  1004. """Create new signature
  1005. - if the first argument is a signature already then it's cloned.
  1006. - if the first argument is a dict, then a Signature version is returned.
  1007. Returns:
  1008. Signature: The resulting signature.
  1009. """
  1010. app = kwargs.get('app')
  1011. if isinstance(varies, dict):
  1012. if isinstance(varies, abstract.CallableSignature):
  1013. return varies.clone()
  1014. return Signature.from_dict(varies, app=app)
  1015. return Signature(varies, *args, **kwargs)
  1016. def maybe_signature(d, app=None):
  1017. if d is not None:
  1018. if (isinstance(d, dict) and
  1019. not isinstance(d, abstract.CallableSignature)):
  1020. d = signature(d)
  1021. if app is not None:
  1022. d._app = app
  1023. return d