builtins.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.builtins
  4. ~~~~~~~~~~~~~~~~~~~
  5. Built-in tasks that are always available in all
  6. app instances. E.g. chord, group and xmap.
  7. """
  8. from __future__ import absolute_import
  9. from collections import deque
  10. from celery._state import get_current_worker_task
  11. from celery.utils import uuid
  12. from celery.utils.log import get_logger
  13. __all__ = ['shared_task', 'load_shared_tasks']
  14. logger = get_logger(__name__)
  15. #: global list of functions defining tasks that should be
  16. #: added to all apps.
  17. _shared_tasks = set()
  18. def shared_task(constructor):
  19. """Decorator that specifies a function that generates a built-in task.
  20. The function will then be called for every new app instance created
  21. (lazily, so more exactly when the task registry for that app is needed).
  22. The function must take a single ``app`` argument.
  23. """
  24. _shared_tasks.add(constructor)
  25. return constructor
  26. def load_shared_tasks(app):
  27. """Create built-in tasks for an app instance."""
  28. constructors = set(_shared_tasks)
  29. for constructor in constructors:
  30. constructor(app)
  31. @shared_task
  32. def add_backend_cleanup_task(app):
  33. """The backend cleanup task can be used to clean up the default result
  34. backend.
  35. If the configured backend requires periodic cleanup this task is also
  36. automatically configured to run every day at midnight (requires
  37. :program:`celery beat` to be running).
  38. """
  39. @app.task(name='celery.backend_cleanup',
  40. shared=False, _force_evaluate=True)
  41. def backend_cleanup():
  42. app.backend.cleanup()
  43. return backend_cleanup
  44. @shared_task
  45. def add_unlock_chord_task(app):
  46. """This task is used by result backends without native chord support.
  47. It joins chords by creating a task chain polling the header for completion.
  48. """
  49. from celery.canvas import signature
  50. from celery.exceptions import ChordError
  51. from celery.result import allow_join_result, result_from_tuple
  52. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  53. @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
  54. default_retry_delay=1, ignore_result=True, _force_evaluate=True)
  55. def unlock_chord(group_id, callback, interval=None, propagate=None,
  56. max_retries=None, result=None,
  57. Result=app.AsyncResult, GroupResult=app.GroupResult,
  58. result_from_tuple=result_from_tuple):
  59. # if propagate is disabled exceptions raised by chord tasks
  60. # will be sent as part of the result list to the chord callback.
  61. # Since 3.1 propagate will be enabled by default, and instead
  62. # the chord callback changes state to FAILURE with the
  63. # exception set to ChordError.
  64. propagate = default_propagate if propagate is None else propagate
  65. if interval is None:
  66. interval = unlock_chord.default_retry_delay
  67. # check if the task group is ready, and if so apply the callback.
  68. deps = GroupResult(
  69. group_id,
  70. [result_from_tuple(r, app=app) for r in result],
  71. )
  72. j = deps.join_native if deps.supports_native_join else deps.join
  73. if deps.ready():
  74. callback = signature(callback, app=app)
  75. try:
  76. with allow_join_result():
  77. ret = j(timeout=3.0, propagate=propagate)
  78. except Exception as exc:
  79. try:
  80. culprit = next(deps._failed_join_report())
  81. reason = 'Dependency {0.id} raised {1!r}'.format(
  82. culprit, exc,
  83. )
  84. except StopIteration:
  85. reason = repr(exc)
  86. logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
  87. app.backend.chord_error_from_stack(callback,
  88. ChordError(reason))
  89. else:
  90. try:
  91. callback.delay(ret)
  92. except Exception as exc:
  93. logger.error('Chord %r raised: %r', group_id, exc,
  94. exc_info=1)
  95. app.backend.chord_error_from_stack(
  96. callback,
  97. exc=ChordError('Callback error: {0!r}'.format(exc)),
  98. )
  99. else:
  100. raise unlock_chord.retry(countdown=interval,
  101. max_retries=max_retries)
  102. return unlock_chord
  103. @shared_task
  104. def add_map_task(app):
  105. from celery.canvas import signature
  106. @app.task(name='celery.map', shared=False, _force_evaluate=True)
  107. def xmap(task, it):
  108. task = signature(task, app=app).type
  109. return [task(item) for item in it]
  110. return xmap
  111. @shared_task
  112. def add_starmap_task(app):
  113. from celery.canvas import signature
  114. @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
  115. def xstarmap(task, it):
  116. task = signature(task, app=app).type
  117. return [task(*item) for item in it]
  118. return xstarmap
  119. @shared_task
  120. def add_chunk_task(app):
  121. from celery.canvas import chunks as _chunks
  122. @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
  123. def chunks(task, it, n):
  124. return _chunks.apply_chunks(task, it, n)
  125. return chunks
  126. @shared_task
  127. def add_group_task(app):
  128. _app = app
  129. from celery.canvas import maybe_signature, signature
  130. from celery.result import result_from_tuple
  131. class Group(app.Task):
  132. app = _app
  133. name = 'celery.group'
  134. accept_magic_kwargs = False
  135. _decorated = True
  136. def run(self, tasks, result, group_id, partial_args,
  137. add_to_parent=True):
  138. app = self.app
  139. result = result_from_tuple(result, app)
  140. # any partial args are added to all tasks in the group
  141. taskit = (signature(task, app=app).clone(partial_args)
  142. for i, task in enumerate(tasks))
  143. if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
  144. return app.GroupResult(
  145. result.id,
  146. [stask.apply(group_id=group_id) for stask in taskit],
  147. )
  148. with app.producer_or_acquire() as pub:
  149. [stask.apply_async(group_id=group_id, producer=pub,
  150. add_to_parent=False) for stask in taskit]
  151. parent = get_current_worker_task()
  152. if add_to_parent and parent:
  153. parent.add_trail(result)
  154. return result
  155. def prepare(self, options, tasks, args, **kwargs):
  156. options['group_id'] = group_id = (
  157. options.setdefault('task_id', uuid()))
  158. def prepare_member(task):
  159. task = maybe_signature(task, app=self.app)
  160. task.options['group_id'] = group_id
  161. return task, task.freeze()
  162. try:
  163. tasks, res = list(zip(
  164. *[prepare_member(task) for task in tasks]
  165. ))
  166. except ValueError: # tasks empty
  167. tasks, res = [], []
  168. return (tasks, self.app.GroupResult(group_id, res), group_id, args)
  169. def apply_async(self, partial_args=(), kwargs={}, **options):
  170. if self.app.conf.CELERY_ALWAYS_EAGER:
  171. return self.apply(partial_args, kwargs, **options)
  172. tasks, result, gid, args = self.prepare(
  173. options, args=partial_args, **kwargs
  174. )
  175. super(Group, self).apply_async((
  176. list(tasks), result.as_tuple(), gid, args), **options
  177. )
  178. return result
  179. def apply(self, args=(), kwargs={}, **options):
  180. return super(Group, self).apply(
  181. self.prepare(options, args=args, **kwargs),
  182. **options).get()
  183. return Group
  184. @shared_task
  185. def add_chain_task(app):
  186. from celery.canvas import (
  187. Signature, chain, chord, group, maybe_signature, maybe_unroll_group,
  188. )
  189. _app = app
  190. class Chain(app.Task):
  191. app = _app
  192. name = 'celery.chain'
  193. accept_magic_kwargs = False
  194. _decorated = True
  195. def prepare_steps(self, args, tasks):
  196. app = self.app
  197. steps = deque(tasks)
  198. next_step = prev_task = prev_res = None
  199. tasks, results = [], []
  200. i = 0
  201. while steps:
  202. # First task get partial args from chain.
  203. task = maybe_signature(steps.popleft(), app=app)
  204. task = task.clone() if i else task.clone(args)
  205. res = task.freeze()
  206. i += 1
  207. if isinstance(task, group):
  208. task = maybe_unroll_group(task)
  209. if isinstance(task, chain):
  210. # splice the chain
  211. steps.extendleft(reversed(task.tasks))
  212. continue
  213. elif isinstance(task, group) and steps and \
  214. not isinstance(steps[0], group):
  215. # automatically upgrade group(..) | s to chord(group, s)
  216. try:
  217. next_step = steps.popleft()
  218. # for chords we freeze by pretending it's a normal
  219. # task instead of a group.
  220. res = Signature.freeze(next_step)
  221. task = chord(task, body=next_step, task_id=res.task_id)
  222. except IndexError:
  223. pass # no callback, so keep as group
  224. if prev_task:
  225. # link previous task to this task.
  226. prev_task.link(task)
  227. # set the results parent attribute.
  228. if not res.parent:
  229. res.parent = prev_res
  230. if not isinstance(prev_task, chord):
  231. results.append(res)
  232. tasks.append(task)
  233. prev_task, prev_res = task, res
  234. return tasks, results
  235. def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
  236. task_id=None, link=None, link_error=None, **options):
  237. if self.app.conf.CELERY_ALWAYS_EAGER:
  238. return self.apply(args, kwargs, **options)
  239. options.pop('publisher', None)
  240. tasks, results = self.prepare_steps(args, kwargs['tasks'])
  241. result = results[-1]
  242. if group_id:
  243. tasks[-1].set(group_id=group_id)
  244. if chord:
  245. tasks[-1].set(chord=chord)
  246. if task_id:
  247. tasks[-1].set(task_id=task_id)
  248. result = tasks[-1].type.AsyncResult(task_id)
  249. # make sure we can do a link() and link_error() on a chain object.
  250. if link:
  251. tasks[-1].set(link=link)
  252. # and if any task in the chain fails, call the errbacks
  253. if link_error:
  254. for task in tasks:
  255. task.set(link_error=link_error)
  256. tasks[0].apply_async(**options)
  257. return result
  258. def apply(self, args=(), kwargs={}, signature=maybe_signature,
  259. **options):
  260. app = self.app
  261. last, fargs = None, args # fargs passed to first task only
  262. for task in kwargs['tasks']:
  263. res = signature(task, app=app).clone(fargs).apply(
  264. last and (last.get(), ),
  265. )
  266. res.parent, last, fargs = last, res, None
  267. return last
  268. return Chain
  269. @shared_task
  270. def add_chord_task(app):
  271. """Every chord is executed in a dedicated task, so that the chord
  272. can be used as a signature, and this generates the task
  273. responsible for that."""
  274. from celery import group
  275. from celery.canvas import maybe_signature
  276. _app = app
  277. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  278. class Chord(app.Task):
  279. app = _app
  280. name = 'celery.chord'
  281. accept_magic_kwargs = False
  282. ignore_result = False
  283. _decorated = True
  284. def run(self, header, body, partial_args=(), interval=None,
  285. countdown=1, max_retries=None, propagate=None,
  286. eager=False, **kwargs):
  287. app = self.app
  288. propagate = default_propagate if propagate is None else propagate
  289. group_id = uuid()
  290. # - convert back to group if serialized
  291. tasks = header.tasks if isinstance(header, group) else header
  292. header = group([
  293. maybe_signature(s, app=app).clone() for s in tasks
  294. ], app=self.app)
  295. # - eager applies the group inline
  296. if eager:
  297. return header.apply(args=partial_args, task_id=group_id)
  298. body.setdefault('chord_size', len(header.tasks))
  299. results = header.freeze(group_id=group_id, chord=body).results
  300. return self.backend.apply_chord(
  301. header, partial_args, group_id,
  302. body, interval=interval, countdown=countdown,
  303. max_retries=max_retries, propagate=propagate, result=results,
  304. )
  305. def apply_async(self, args=(), kwargs={}, task_id=None,
  306. group_id=None, chord=None, **options):
  307. app = self.app
  308. if app.conf.CELERY_ALWAYS_EAGER:
  309. return self.apply(args, kwargs, **options)
  310. header = kwargs.pop('header')
  311. body = kwargs.pop('body')
  312. header, body = (maybe_signature(header, app=app),
  313. maybe_signature(body, app=app))
  314. # forward certain options to body
  315. if chord is not None:
  316. body.options['chord'] = chord
  317. if group_id is not None:
  318. body.options['group_id'] = group_id
  319. [body.link(s) for s in options.pop('link', [])]
  320. [body.link_error(s) for s in options.pop('link_error', [])]
  321. body_result = body.freeze(task_id)
  322. parent = super(Chord, self).apply_async((header, body, args),
  323. kwargs, **options)
  324. body_result.parent = parent
  325. return body_result
  326. def apply(self, args=(), kwargs={}, propagate=True, **options):
  327. body = kwargs['body']
  328. res = super(Chord, self).apply(args, dict(kwargs, eager=True),
  329. **options)
  330. return maybe_signature(body, app=self.app).apply(
  331. args=(res.get(propagate=propagate).get(), ))
  332. return Chord