builtins.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.builtins
  4. ~~~~~~~~~~~~~~~~~~~
  5. Built-in tasks that are always available in all
  6. app instances. E.g. chord, group and xmap.
  7. """
  8. from __future__ import absolute_import
  9. from collections import deque
  10. from celery._state import get_current_worker_task, connect_on_app_finalize
  11. from celery.utils import uuid
  12. from celery.utils.log import get_logger
  13. __all__ = []
  14. logger = get_logger(__name__)
  15. @connect_on_app_finalize
  16. def add_backend_cleanup_task(app):
  17. """The backend cleanup task can be used to clean up the default result
  18. backend.
  19. If the configured backend requires periodic cleanup this task is also
  20. automatically configured to run every day at midnight (requires
  21. :program:`celery beat` to be running).
  22. """
  23. @app.task(name='celery.backend_cleanup',
  24. shared=False, _force_evaluate=True)
  25. def backend_cleanup():
  26. app.backend.cleanup()
  27. return backend_cleanup
  28. @connect_on_app_finalize
  29. def add_unlock_chord_task(app):
  30. """This task is used by result backends without native chord support.
  31. It joins chords by creating a task chain polling the header for completion.
  32. """
  33. from celery.canvas import signature
  34. from celery.exceptions import ChordError
  35. from celery.result import allow_join_result, result_from_tuple
  36. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  37. @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
  38. default_retry_delay=1, ignore_result=True, _force_evaluate=True)
  39. def unlock_chord(group_id, callback, interval=None, propagate=None,
  40. max_retries=None, result=None,
  41. Result=app.AsyncResult, GroupResult=app.GroupResult,
  42. result_from_tuple=result_from_tuple):
  43. # if propagate is disabled exceptions raised by chord tasks
  44. # will be sent as part of the result list to the chord callback.
  45. # Since 3.1 propagate will be enabled by default, and instead
  46. # the chord callback changes state to FAILURE with the
  47. # exception set to ChordError.
  48. propagate = default_propagate if propagate is None else propagate
  49. if interval is None:
  50. interval = unlock_chord.default_retry_delay
  51. # check if the task group is ready, and if so apply the callback.
  52. deps = GroupResult(
  53. group_id,
  54. [result_from_tuple(r, app=app) for r in result],
  55. )
  56. j = deps.join_native if deps.supports_native_join else deps.join
  57. if deps.ready():
  58. callback = signature(callback, app=app)
  59. try:
  60. with allow_join_result():
  61. ret = j(timeout=3.0, propagate=propagate)
  62. except Exception as exc:
  63. try:
  64. culprit = next(deps._failed_join_report())
  65. reason = 'Dependency {0.id} raised {1!r}'.format(
  66. culprit, exc,
  67. )
  68. except StopIteration:
  69. reason = repr(exc)
  70. logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
  71. app.backend.chord_error_from_stack(callback,
  72. ChordError(reason))
  73. else:
  74. try:
  75. callback.delay(ret)
  76. except Exception as exc:
  77. logger.error('Chord %r raised: %r', group_id, exc,
  78. exc_info=1)
  79. app.backend.chord_error_from_stack(
  80. callback,
  81. exc=ChordError('Callback error: {0!r}'.format(exc)),
  82. )
  83. else:
  84. raise unlock_chord.retry(countdown=interval,
  85. max_retries=max_retries)
  86. return unlock_chord
  87. @connect_on_app_finalize
  88. def add_map_task(app):
  89. from celery.canvas import signature
  90. @app.task(name='celery.map', shared=False, _force_evaluate=True)
  91. def xmap(task, it):
  92. task = signature(task, app=app).type
  93. return [task(item) for item in it]
  94. return xmap
  95. @connect_on_app_finalize
  96. def add_starmap_task(app):
  97. from celery.canvas import signature
  98. @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
  99. def xstarmap(task, it):
  100. task = signature(task, app=app).type
  101. return [task(*item) for item in it]
  102. return xstarmap
  103. @connect_on_app_finalize
  104. def add_chunk_task(app):
  105. from celery.canvas import chunks as _chunks
  106. @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
  107. def chunks(task, it, n):
  108. return _chunks.apply_chunks(task, it, n)
  109. return chunks
  110. @connect_on_app_finalize
  111. def add_group_task(app):
  112. _app = app
  113. from celery.canvas import maybe_signature, signature
  114. from celery.result import result_from_tuple
  115. class Group(app.Task):
  116. app = _app
  117. name = 'celery.group'
  118. accept_magic_kwargs = False
  119. _decorated = True
  120. def run(self, tasks, result, group_id, partial_args,
  121. add_to_parent=True):
  122. app = self.app
  123. result = result_from_tuple(result, app)
  124. # any partial args are added to all tasks in the group
  125. taskit = (signature(task, app=app).clone(partial_args)
  126. for i, task in enumerate(tasks))
  127. if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
  128. return app.GroupResult(
  129. result.id,
  130. [stask.apply(group_id=group_id) for stask in taskit],
  131. )
  132. with app.producer_or_acquire() as pub:
  133. [stask.apply_async(group_id=group_id, producer=pub,
  134. add_to_parent=False) for stask in taskit]
  135. parent = get_current_worker_task()
  136. if add_to_parent and parent:
  137. parent.add_trail(result)
  138. return result
  139. def prepare(self, options, tasks, args, **kwargs):
  140. options['group_id'] = group_id = (
  141. options.setdefault('task_id', uuid()))
  142. def prepare_member(task):
  143. task = maybe_signature(task, app=self.app)
  144. task.options['group_id'] = group_id
  145. return task, task.freeze()
  146. try:
  147. tasks, res = list(zip(
  148. *[prepare_member(task) for task in tasks]
  149. ))
  150. except ValueError: # tasks empty
  151. tasks, res = [], []
  152. return (tasks, self.app.GroupResult(group_id, res), group_id, args)
  153. def apply_async(self, partial_args=(), kwargs={}, **options):
  154. if self.app.conf.CELERY_ALWAYS_EAGER:
  155. return self.apply(partial_args, kwargs, **options)
  156. tasks, result, gid, args = self.prepare(
  157. options, args=partial_args, **kwargs
  158. )
  159. super(Group, self).apply_async((
  160. list(tasks), result.as_tuple(), gid, args), **options
  161. )
  162. return result
  163. def apply(self, args=(), kwargs={}, **options):
  164. return super(Group, self).apply(
  165. self.prepare(options, args=args, **kwargs),
  166. **options).get()
  167. return Group
  168. @connect_on_app_finalize
  169. def add_chain_task(app):
  170. from celery.canvas import (
  171. Signature, chain, chord, group, maybe_signature, maybe_unroll_group,
  172. )
  173. _app = app
  174. class Chain(app.Task):
  175. app = _app
  176. name = 'celery.chain'
  177. accept_magic_kwargs = False
  178. _decorated = True
  179. def prepare_steps(self, args, tasks):
  180. app = self.app
  181. steps = deque(tasks)
  182. next_step = prev_task = prev_res = None
  183. tasks, results = [], []
  184. i = 0
  185. while steps:
  186. # First task get partial args from chain.
  187. task = maybe_signature(steps.popleft(), app=app)
  188. task = task.clone() if i else task.clone(args)
  189. res = task.freeze()
  190. i += 1
  191. if isinstance(task, group):
  192. task = maybe_unroll_group(task)
  193. if isinstance(task, chain):
  194. # splice the chain
  195. steps.extendleft(reversed(task.tasks))
  196. continue
  197. elif isinstance(task, group) and steps and \
  198. not isinstance(steps[0], group):
  199. # automatically upgrade group(..) | s to chord(group, s)
  200. try:
  201. next_step = steps.popleft()
  202. # for chords we freeze by pretending it's a normal
  203. # task instead of a group.
  204. res = Signature.freeze(next_step)
  205. task = chord(task, body=next_step, task_id=res.task_id)
  206. except IndexError:
  207. pass # no callback, so keep as group
  208. if prev_task:
  209. # link previous task to this task.
  210. prev_task.link(task)
  211. # set the results parent attribute.
  212. if not res.parent:
  213. res.parent = prev_res
  214. if not isinstance(prev_task, chord):
  215. results.append(res)
  216. tasks.append(task)
  217. prev_task, prev_res = task, res
  218. return tasks, results
  219. def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
  220. task_id=None, link=None, link_error=None, **options):
  221. if self.app.conf.CELERY_ALWAYS_EAGER:
  222. return self.apply(args, kwargs, **options)
  223. options.pop('publisher', None)
  224. tasks, results = self.prepare_steps(args, kwargs['tasks'])
  225. result = results[-1]
  226. if group_id:
  227. tasks[-1].set(group_id=group_id)
  228. if chord:
  229. tasks[-1].set(chord=chord)
  230. if task_id:
  231. tasks[-1].set(task_id=task_id)
  232. result = tasks[-1].type.AsyncResult(task_id)
  233. # make sure we can do a link() and link_error() on a chain object.
  234. if link:
  235. tasks[-1].set(link=link)
  236. # and if any task in the chain fails, call the errbacks
  237. if link_error:
  238. for task in tasks:
  239. task.set(link_error=link_error)
  240. tasks[0].apply_async(**options)
  241. return result
  242. def apply(self, args=(), kwargs={}, signature=maybe_signature,
  243. **options):
  244. app = self.app
  245. last, fargs = None, args # fargs passed to first task only
  246. for task in kwargs['tasks']:
  247. res = signature(task, app=app).clone(fargs).apply(
  248. last and (last.get(), ),
  249. )
  250. res.parent, last, fargs = last, res, None
  251. return last
  252. return Chain
  253. @connect_on_app_finalize
  254. def add_chord_task(app):
  255. """Every chord is executed in a dedicated task, so that the chord
  256. can be used as a signature, and this generates the task
  257. responsible for that."""
  258. from celery import group
  259. from celery.canvas import maybe_signature
  260. _app = app
  261. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  262. class Chord(app.Task):
  263. app = _app
  264. name = 'celery.chord'
  265. accept_magic_kwargs = False
  266. ignore_result = False
  267. _decorated = True
  268. def run(self, header, body, partial_args=(), interval=None,
  269. countdown=1, max_retries=None, propagate=None,
  270. eager=False, **kwargs):
  271. app = self.app
  272. propagate = default_propagate if propagate is None else propagate
  273. group_id = uuid()
  274. # - convert back to group if serialized
  275. tasks = header.tasks if isinstance(header, group) else header
  276. header = group([
  277. maybe_signature(s, app=app).clone() for s in tasks
  278. ], app=self.app)
  279. # - eager applies the group inline
  280. if eager:
  281. return header.apply(args=partial_args, task_id=group_id)
  282. body.setdefault('chord_size', len(header.tasks))
  283. results = header.freeze(group_id=group_id, chord=body).results
  284. return self.backend.apply_chord(
  285. header, partial_args, group_id,
  286. body, interval=interval, countdown=countdown,
  287. max_retries=max_retries, propagate=propagate, result=results,
  288. )
  289. def apply_async(self, args=(), kwargs={}, task_id=None,
  290. group_id=None, chord=None, **options):
  291. app = self.app
  292. if app.conf.CELERY_ALWAYS_EAGER:
  293. return self.apply(args, kwargs, **options)
  294. header = kwargs.pop('header')
  295. body = kwargs.pop('body')
  296. header, body = (maybe_signature(header, app=app),
  297. maybe_signature(body, app=app))
  298. # forward certain options to body
  299. if chord is not None:
  300. body.options['chord'] = chord
  301. if group_id is not None:
  302. body.options['group_id'] = group_id
  303. [body.link(s) for s in options.pop('link', [])]
  304. [body.link_error(s) for s in options.pop('link_error', [])]
  305. body_result = body.freeze(task_id)
  306. parent = super(Chord, self).apply_async((header, body, args),
  307. kwargs, **options)
  308. body_result.parent = parent
  309. return body_result
  310. def apply(self, args=(), kwargs={}, propagate=True, **options):
  311. body = kwargs['body']
  312. res = super(Chord, self).apply(args, dict(kwargs, eager=True),
  313. **options)
  314. return maybe_signature(body, app=self.app).apply(
  315. args=(res.get(propagate=propagate).get(), ))
  316. return Chord