builtins.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.builtins
  4. ~~~~~~~~~~~~~~~~~~~
  5. Built-in tasks that are always available in all
  6. app instances. E.g. chord, group and xmap.
  7. """
  8. from __future__ import absolute_import
  9. from collections import deque
  10. from celery._state import get_current_worker_task
  11. from celery.utils import uuid
  12. __all__ = ['shared_task', 'load_shared_tasks']
  13. #: global list of functions defining tasks that should be
  14. #: added to all apps.
  15. _shared_tasks = set()
  16. def shared_task(constructor):
  17. """Decorator that specifies a function that generates a built-in task.
  18. The function will then be called for every new app instance created
  19. (lazily, so more exactly when the task registry for that app is needed).
  20. The function must take a single ``app`` argument.
  21. """
  22. _shared_tasks.add(constructor)
  23. return constructor
  24. def load_shared_tasks(app):
  25. """Create built-in tasks for an app instance."""
  26. constructors = set(_shared_tasks)
  27. for constructor in constructors:
  28. constructor(app)
  29. @shared_task
  30. def add_backend_cleanup_task(app):
  31. """The backend cleanup task can be used to clean up the default result
  32. backend.
  33. If the configured backend requires periodic cleanup this task is also
  34. automatically configured to run every day at midnight (requires
  35. :program:`celery beat` to be running).
  36. """
  37. @app.task(name='celery.backend_cleanup',
  38. shared=False, _force_evaluate=True)
  39. def backend_cleanup():
  40. app.backend.cleanup()
  41. return backend_cleanup
  42. @shared_task
  43. def add_unlock_chord_task(app):
  44. """This task is used by result backends without native chord support.
  45. It joins chords by creating a task chain polling the header for completion.
  46. """
  47. from celery.canvas import signature
  48. from celery.exceptions import ChordError
  49. from celery.result import allow_join_result, result_from_tuple
  50. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  51. @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
  52. default_retry_delay=1, ignore_result=True, _force_evaluate=True)
  53. def unlock_chord(group_id, callback, interval=None, propagate=None,
  54. max_retries=None, result=None,
  55. Result=app.AsyncResult, GroupResult=app.GroupResult,
  56. result_from_tuple=result_from_tuple):
  57. # if propagate is disabled exceptions raised by chord tasks
  58. # will be sent as part of the result list to the chord callback.
  59. # Since 3.1 propagate will be enabled by default, and instead
  60. # the chord callback changes state to FAILURE with the
  61. # exception set to ChordError.
  62. propagate = default_propagate if propagate is None else propagate
  63. if interval is None:
  64. interval = unlock_chord.default_retry_delay
  65. # check if the task group is ready, and if so apply the callback.
  66. deps = GroupResult(
  67. group_id,
  68. [result_from_tuple(r, app=app) for r in result],
  69. )
  70. j = deps.join_native if deps.supports_native_join else deps.join
  71. if deps.ready():
  72. callback = signature(callback, app=app)
  73. try:
  74. with allow_join_result():
  75. ret = j(timeout=3.0, propagate=propagate)
  76. except Exception as exc:
  77. try:
  78. culprit = next(deps._failed_join_report())
  79. reason = 'Dependency {0.id} raised {1!r}'.format(
  80. culprit, exc,
  81. )
  82. except StopIteration:
  83. reason = repr(exc)
  84. app._tasks[callback.task].backend.fail_from_current_stack(
  85. callback.id, exc=ChordError(reason),
  86. )
  87. else:
  88. try:
  89. callback.delay(ret)
  90. except Exception as exc:
  91. app._tasks[callback.task].backend.fail_from_current_stack(
  92. callback.id,
  93. exc=ChordError('Callback error: {0!r}'.format(exc)),
  94. )
  95. else:
  96. raise unlock_chord.retry(countdown=interval,
  97. max_retries=max_retries)
  98. return unlock_chord
  99. @shared_task
  100. def add_map_task(app):
  101. from celery.canvas import signature
  102. @app.task(name='celery.map', shared=False, _force_evaluate=True)
  103. def xmap(task, it):
  104. task = signature(task, app=app).type
  105. return [task(item) for item in it]
  106. return xmap
  107. @shared_task
  108. def add_starmap_task(app):
  109. from celery.canvas import signature
  110. @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
  111. def xstarmap(task, it):
  112. task = signature(task, app=app).type
  113. return [task(*item) for item in it]
  114. return xstarmap
  115. @shared_task
  116. def add_chunk_task(app):
  117. from celery.canvas import chunks as _chunks
  118. @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
  119. def chunks(task, it, n):
  120. return _chunks.apply_chunks(task, it, n)
  121. return chunks
  122. @shared_task
  123. def add_group_task(app):
  124. _app = app
  125. from celery.canvas import maybe_signature, signature
  126. from celery.result import result_from_tuple
  127. class Group(app.Task):
  128. app = _app
  129. name = 'celery.group'
  130. accept_magic_kwargs = False
  131. _decorated = True
  132. def run(self, tasks, result, group_id, partial_args):
  133. app = self.app
  134. result = result_from_tuple(result, app)
  135. # any partial args are added to all tasks in the group
  136. taskit = (signature(task, app=app).clone(partial_args)
  137. for i, task in enumerate(tasks))
  138. if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
  139. return app.GroupResult(
  140. result.id,
  141. [stask.apply(group_id=group_id) for stask in taskit],
  142. )
  143. with app.producer_or_acquire() as pub:
  144. [stask.apply_async(group_id=group_id, publisher=pub,
  145. add_to_parent=False) for stask in taskit]
  146. parent = get_current_worker_task()
  147. if parent:
  148. parent.add_trail(result)
  149. return result
  150. def prepare(self, options, tasks, args, **kwargs):
  151. options['group_id'] = group_id = (
  152. options.setdefault('task_id', uuid()))
  153. def prepare_member(task):
  154. task = maybe_signature(task, app=self.app)
  155. task.options['group_id'] = group_id
  156. return task, task.freeze()
  157. try:
  158. tasks, res = list(zip(
  159. *[prepare_member(task) for task in tasks]
  160. ))
  161. except ValueError: # tasks empty
  162. tasks, res = [], []
  163. return (tasks, self.app.GroupResult(group_id, res), group_id, args)
  164. def apply_async(self, partial_args=(), kwargs={}, **options):
  165. if self.app.conf.CELERY_ALWAYS_EAGER:
  166. return self.apply(partial_args, kwargs, **options)
  167. tasks, result, gid, args = self.prepare(
  168. options, args=partial_args, **kwargs
  169. )
  170. super(Group, self).apply_async((
  171. list(tasks), result.as_tuple(), gid, args), **options
  172. )
  173. return result
  174. def apply(self, args=(), kwargs={}, **options):
  175. return super(Group, self).apply(
  176. self.prepare(options, args=args, **kwargs),
  177. **options).get()
  178. return Group
  179. @shared_task
  180. def add_chain_task(app):
  181. from celery.canvas import (
  182. Signature, chain, chord, group, maybe_signature, maybe_unroll_group,
  183. )
  184. _app = app
  185. class Chain(app.Task):
  186. app = _app
  187. name = 'celery.chain'
  188. accept_magic_kwargs = False
  189. _decorated = True
  190. def prepare_steps(self, args, tasks):
  191. app = self.app
  192. steps = deque(tasks)
  193. next_step = prev_task = prev_res = None
  194. tasks, results = [], []
  195. i = 0
  196. while steps:
  197. # First task get partial args from chain.
  198. task = maybe_signature(steps.popleft(), app=app)
  199. task = task.clone() if i else task.clone(args)
  200. res = task.freeze()
  201. i += 1
  202. if isinstance(task, group):
  203. task = maybe_unroll_group(task)
  204. if isinstance(task, chain):
  205. # splice the chain
  206. steps.extendleft(reversed(task.tasks))
  207. continue
  208. elif isinstance(task, group) and steps and \
  209. not isinstance(steps[0], group):
  210. # automatically upgrade group(..) | s to chord(group, s)
  211. try:
  212. next_step = steps.popleft()
  213. # for chords we freeze by pretending it's a normal
  214. # task instead of a group.
  215. res = Signature.freeze(next_step)
  216. task = chord(task, body=next_step, task_id=res.task_id)
  217. except IndexError:
  218. pass # no callback, so keep as group
  219. if prev_task:
  220. # link previous task to this task.
  221. prev_task.link(task)
  222. # set the results parent attribute.
  223. if not res.parent:
  224. res.parent = prev_res
  225. if not isinstance(prev_task, chord):
  226. results.append(res)
  227. tasks.append(task)
  228. prev_task, prev_res = task, res
  229. return tasks, results
  230. def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
  231. task_id=None, link=None, link_error=None, **options):
  232. if self.app.conf.CELERY_ALWAYS_EAGER:
  233. return self.apply(args, kwargs, **options)
  234. options.pop('publisher', None)
  235. tasks, results = self.prepare_steps(args, kwargs['tasks'])
  236. result = results[-1]
  237. if group_id:
  238. tasks[-1].set(group_id=group_id)
  239. if chord:
  240. tasks[-1].set(chord=chord)
  241. if task_id:
  242. tasks[-1].set(task_id=task_id)
  243. result = tasks[-1].type.AsyncResult(task_id)
  244. # make sure we can do a link() and link_error() on a chain object.
  245. if link:
  246. tasks[-1].set(link=link)
  247. # and if any task in the chain fails, call the errbacks
  248. if link_error:
  249. for task in tasks:
  250. task.set(link_error=link_error)
  251. tasks[0].apply_async(**options)
  252. return result
  253. def apply(self, args=(), kwargs={}, signature=maybe_signature,
  254. **options):
  255. app = self.app
  256. last, fargs = None, args # fargs passed to first task only
  257. for task in kwargs['tasks']:
  258. res = signature(task, app=app).clone(fargs).apply(
  259. last and (last.get(), ),
  260. )
  261. res.parent, last, fargs = last, res, None
  262. return last
  263. return Chain
  264. @shared_task
  265. def add_chord_task(app):
  266. """Every chord is executed in a dedicated task, so that the chord
  267. can be used as a signature, and this generates the task
  268. responsible for that."""
  269. from celery import group
  270. from celery.canvas import maybe_signature
  271. _app = app
  272. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  273. class Chord(app.Task):
  274. app = _app
  275. name = 'celery.chord'
  276. accept_magic_kwargs = False
  277. ignore_result = False
  278. _decorated = True
  279. def run(self, header, body, partial_args=(), interval=None,
  280. countdown=1, max_retries=None, propagate=None,
  281. eager=False, **kwargs):
  282. app = self.app
  283. propagate = default_propagate if propagate is None else propagate
  284. group_id = uuid()
  285. AsyncResult = app.AsyncResult
  286. prepare_member = self._prepare_member
  287. # - convert back to group if serialized
  288. tasks = header.tasks if isinstance(header, group) else header
  289. header = group([
  290. maybe_signature(s, app=app).clone() for s in tasks
  291. ])
  292. # - eager applies the group inline
  293. if eager:
  294. return header.apply(args=partial_args, task_id=group_id)
  295. results = [AsyncResult(prepare_member(task, body, group_id))
  296. for task in header.tasks]
  297. return self.backend.apply_chord(
  298. header, partial_args, group_id,
  299. body, interval=interval, countdown=countdown,
  300. max_retries=max_retries, propagate=propagate, result=results,
  301. )
  302. def _prepare_member(self, task, body, group_id):
  303. opts = task.options
  304. # d.setdefault would work but generating uuid's are expensive
  305. try:
  306. task_id = opts['task_id']
  307. except KeyError:
  308. task_id = opts['task_id'] = uuid()
  309. opts.update(chord=body, group_id=group_id)
  310. return task_id
  311. def apply_async(self, args=(), kwargs={}, task_id=None,
  312. group_id=None, chord=None, **options):
  313. app = self.app
  314. if app.conf.CELERY_ALWAYS_EAGER:
  315. return self.apply(args, kwargs, **options)
  316. header = kwargs.pop('header')
  317. body = kwargs.pop('body')
  318. header, body = (maybe_signature(header, app=app),
  319. maybe_signature(body, app=app))
  320. # forward certain options to body
  321. if chord is not None:
  322. body.options['chord'] = chord
  323. if group_id is not None:
  324. body.options['group_id'] = group_id
  325. [body.link(s) for s in options.pop('link', [])]
  326. [body.link_error(s) for s in options.pop('link_error', [])]
  327. body_result = body.freeze(task_id)
  328. parent = super(Chord, self).apply_async((header, body, args),
  329. kwargs, **options)
  330. body_result.parent = parent
  331. return body_result
  332. def apply(self, args=(), kwargs={}, propagate=True, **options):
  333. body = kwargs['body']
  334. res = super(Chord, self).apply(args, dict(kwargs, eager=True),
  335. **options)
  336. return maybe_signature(body, app=self.app).apply(
  337. args=(res.get(propagate=propagate).get(), ))
  338. return Chord