builtins.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.builtins
  4. ~~~~~~~~~~~~~~~~~~~
  5. Built-in tasks that are always available in all
  6. app instances. E.g. chord, group and xmap.
  7. """
  8. from __future__ import absolute_import
  9. from collections import deque
  10. from celery._state import get_current_worker_task
  11. from celery.utils import uuid
  12. __all__ = ['shared_task', 'load_shared_tasks']
  13. #: global list of functions defining tasks that should be
  14. #: added to all apps.
  15. _shared_tasks = set()
  16. def shared_task(constructor):
  17. """Decorator that specifies a function that generates a built-in task.
  18. The function will then be called for every new app instance created
  19. (lazily, so more exactly when the task registry for that app is needed).
  20. The function must take a single ``app`` argument.
  21. """
  22. _shared_tasks.add(constructor)
  23. return constructor
  24. def load_shared_tasks(app):
  25. """Create built-in tasks for an app instance."""
  26. constructors = set(_shared_tasks)
  27. for constructor in constructors:
  28. constructor(app)
  29. @shared_task
  30. def add_backend_cleanup_task(app):
  31. """The backend cleanup task can be used to clean up the default result
  32. backend.
  33. If the configured backend requires periodic cleanup this task is also
  34. automatically configured to run every day at midnight (requires
  35. :program:`celery beat` to be running).
  36. """
  37. @app.task(name='celery.backend_cleanup',
  38. shared=False, _force_evaluate=True)
  39. def backend_cleanup():
  40. app.backend.cleanup()
  41. return backend_cleanup
  42. @shared_task
  43. def add_unlock_chord_task(app):
  44. """This task is used by result backends without native chord support.
  45. It joins chords by creating a task chain polling the header for completion.
  46. """
  47. from celery.canvas import signature
  48. from celery.exceptions import ChordError
  49. from celery.result import result_from_tuple
  50. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  51. @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
  52. default_retry_delay=1, ignore_result=True, _force_evaluate=True)
  53. def unlock_chord(group_id, callback, interval=None, propagate=None,
  54. max_retries=None, result=None,
  55. Result=app.AsyncResult, GroupResult=app.GroupResult,
  56. result_from_tuple=result_from_tuple):
  57. # if propagate is disabled exceptions raised by chord tasks
  58. # will be sent as part of the result list to the chord callback.
  59. # Since 3.1 propagate will be enabled by default, and instead
  60. # the chord callback changes state to FAILURE with the
  61. # exception set to ChordError.
  62. propagate = default_propagate if propagate is None else propagate
  63. if interval is None:
  64. interval = unlock_chord.default_retry_delay
  65. # check if the task group is ready, and if so apply the callback.
  66. deps = GroupResult(
  67. group_id,
  68. [result_from_tuple(r, app=app) for r in result],
  69. )
  70. j = deps.join_native if deps.supports_native_join else deps.join
  71. if deps.ready():
  72. callback = signature(callback, app=app)
  73. try:
  74. ret = j(propagate=propagate)
  75. except Exception as exc:
  76. try:
  77. culprit = next(deps._failed_join_report())
  78. reason = 'Dependency {0.id} raised {1!r}'.format(
  79. culprit, exc,
  80. )
  81. except StopIteration:
  82. reason = repr(exc)
  83. app._tasks[callback.task].backend.fail_from_current_stack(
  84. callback.id, exc=ChordError(reason),
  85. )
  86. else:
  87. try:
  88. callback.delay(ret)
  89. except Exception as exc:
  90. app._tasks[callback.task].backend.fail_from_current_stack(
  91. callback.id,
  92. exc=ChordError('Callback error: {0!r}'.format(exc)),
  93. )
  94. else:
  95. return unlock_chord.retry(countdown=interval,
  96. max_retries=max_retries)
  97. return unlock_chord
  98. @shared_task
  99. def add_map_task(app):
  100. from celery.canvas import signature
  101. @app.task(name='celery.map', shared=False, _force_evaluate=True)
  102. def xmap(task, it):
  103. task = signature(task, app=app).type
  104. return [task(item) for item in it]
  105. return xmap
  106. @shared_task
  107. def add_starmap_task(app):
  108. from celery.canvas import signature
  109. @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
  110. def xstarmap(task, it):
  111. task = signature(task, app=app).type
  112. return [task(*item) for item in it]
  113. return xstarmap
  114. @shared_task
  115. def add_chunk_task(app):
  116. from celery.canvas import chunks as _chunks
  117. @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
  118. def chunks(task, it, n):
  119. return _chunks.apply_chunks(task, it, n)
  120. return chunks
  121. @shared_task
  122. def add_group_task(app):
  123. _app = app
  124. from celery.canvas import maybe_signature, signature
  125. from celery.result import result_from_tuple
  126. class Group(app.Task):
  127. app = _app
  128. name = 'celery.group'
  129. accept_magic_kwargs = False
  130. _decorated = True
  131. def run(self, tasks, result, group_id, partial_args):
  132. app = self.app
  133. result = result_from_tuple(result, app)
  134. # any partial args are added to all tasks in the group
  135. taskit = (signature(task, app=app).clone(partial_args)
  136. for i, task in enumerate(tasks))
  137. if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
  138. return app.GroupResult(
  139. result.id,
  140. [stask.apply(group_id=group_id) for stask in taskit],
  141. )
  142. with app.producer_or_acquire() as pub:
  143. [stask.apply_async(group_id=group_id, publisher=pub,
  144. add_to_parent=False) for stask in taskit]
  145. parent = get_current_worker_task()
  146. if parent:
  147. parent.add_trail(result)
  148. return result
  149. def prepare(self, options, tasks, args, **kwargs):
  150. options['group_id'] = group_id = (
  151. options.setdefault('task_id', uuid()))
  152. def prepare_member(task):
  153. task = maybe_signature(task, app=self.app)
  154. task.options['group_id'] = group_id
  155. return task, task.freeze()
  156. try:
  157. tasks, res = list(zip(
  158. *[prepare_member(task) for task in tasks]
  159. ))
  160. except ValueError: # tasks empty
  161. tasks, res = [], []
  162. return (tasks, self.app.GroupResult(group_id, res), group_id, args)
  163. def apply_async(self, partial_args=(), kwargs={}, **options):
  164. if self.app.conf.CELERY_ALWAYS_EAGER:
  165. return self.apply(partial_args, kwargs, **options)
  166. tasks, result, gid, args = self.prepare(
  167. options, args=partial_args, **kwargs
  168. )
  169. super(Group, self).apply_async((
  170. list(tasks), result.as_tuple(), gid, args), **options
  171. )
  172. return result
  173. def apply(self, args=(), kwargs={}, **options):
  174. return super(Group, self).apply(
  175. self.prepare(options, args=args, **kwargs),
  176. **options).get()
  177. return Group
  178. @shared_task
  179. def add_chain_task(app):
  180. from celery.canvas import Signature, chain, chord, group, maybe_signature
  181. _app = app
  182. class Chain(app.Task):
  183. app = _app
  184. name = 'celery.chain'
  185. accept_magic_kwargs = False
  186. _decorated = True
  187. def prepare_steps(self, args, tasks):
  188. app = self.app
  189. steps = deque(tasks)
  190. next_step = prev_task = prev_res = None
  191. tasks, results = [], []
  192. i = 0
  193. while steps:
  194. # First task get partial args from chain.
  195. task = maybe_signature(steps.popleft(), app=app)
  196. task = task.clone() if i else task.clone(args)
  197. res = task.freeze()
  198. i += 1
  199. if isinstance(task, chain):
  200. # splice the chain
  201. steps.extendleft(reversed(task.tasks))
  202. continue
  203. elif isinstance(task, group) and steps and \
  204. not isinstance(steps[0], group):
  205. # automatically upgrade group(..) | s to chord(group, s)
  206. try:
  207. next_step = steps.popleft()
  208. # for chords we freeze by pretending it's a normal
  209. # task instead of a group.
  210. res = Signature.freeze(next_step)
  211. task = chord(task, body=next_step, task_id=res.task_id)
  212. except IndexError:
  213. pass # no callback, so keep as group
  214. if prev_task:
  215. # link previous task to this task.
  216. prev_task.link(task)
  217. # set the results parent attribute.
  218. if not res.parent:
  219. res.parent = prev_res
  220. if not isinstance(prev_task, chord):
  221. results.append(res)
  222. tasks.append(task)
  223. prev_task, prev_res = task, res
  224. return tasks, results
  225. def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
  226. task_id=None, link=None, link_error=None, **options):
  227. if self.app.conf.CELERY_ALWAYS_EAGER:
  228. return self.apply(args, kwargs, **options)
  229. options.pop('publisher', None)
  230. tasks, results = self.prepare_steps(args, kwargs['tasks'])
  231. result = results[-1]
  232. if group_id:
  233. tasks[-1].set(group_id=group_id)
  234. if chord:
  235. tasks[-1].set(chord=chord)
  236. if task_id:
  237. tasks[-1].set(task_id=task_id)
  238. result = tasks[-1].type.AsyncResult(task_id)
  239. # make sure we can do a link() and link_error() on a chain object.
  240. if link:
  241. tasks[-1].set(link=link)
  242. # and if any task in the chain fails, call the errbacks
  243. if link_error:
  244. for task in tasks:
  245. task.set(link_error=link_error)
  246. tasks[0].apply_async()
  247. return result
  248. def apply(self, args=(), kwargs={}, signature=maybe_signature,
  249. **options):
  250. app = self.app
  251. last, fargs = None, args # fargs passed to first task only
  252. for task in kwargs['tasks']:
  253. res = signature(task, app=app).clone(fargs).apply(
  254. last and (last.get(), ),
  255. )
  256. res.parent, last, fargs = last, res, None
  257. return last
  258. return Chain
  259. @shared_task
  260. def add_chord_task(app):
  261. """Every chord is executed in a dedicated task, so that the chord
  262. can be used as a signature, and this generates the task
  263. responsible for that."""
  264. from celery import group
  265. from celery.canvas import maybe_signature
  266. _app = app
  267. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  268. class Chord(app.Task):
  269. app = _app
  270. name = 'celery.chord'
  271. accept_magic_kwargs = False
  272. ignore_result = False
  273. _decorated = True
  274. def run(self, header, body, partial_args=(), interval=None,
  275. countdown=1, max_retries=None, propagate=None,
  276. eager=False, **kwargs):
  277. app = self.app
  278. propagate = default_propagate if propagate is None else propagate
  279. group_id = uuid()
  280. AsyncResult = app.AsyncResult
  281. prepare_member = self._prepare_member
  282. # - convert back to group if serialized
  283. tasks = header.tasks if isinstance(header, group) else header
  284. header = group([
  285. maybe_signature(s, app=app).clone() for s in tasks
  286. ])
  287. # - eager applies the group inline
  288. if eager:
  289. return header.apply(args=partial_args, task_id=group_id)
  290. results = [AsyncResult(prepare_member(task, body, group_id))
  291. for task in header.tasks]
  292. # - fallback implementations schedules the chord_unlock task here
  293. app.backend.on_chord_apply(group_id, body,
  294. interval=interval,
  295. countdown=countdown,
  296. max_retries=max_retries,
  297. propagate=propagate,
  298. result=results)
  299. # - call the header group, returning the GroupResult.
  300. final_res = header(*partial_args, task_id=group_id)
  301. return final_res
  302. def _prepare_member(self, task, body, group_id):
  303. opts = task.options
  304. # d.setdefault would work but generating uuid's are expensive
  305. try:
  306. task_id = opts['task_id']
  307. except KeyError:
  308. task_id = opts['task_id'] = uuid()
  309. opts.update(chord=body, group_id=group_id)
  310. return task_id
  311. def apply_async(self, args=(), kwargs={}, task_id=None,
  312. group_id=None, chord=None, **options):
  313. app = self.app
  314. if app.conf.CELERY_ALWAYS_EAGER:
  315. return self.apply(args, kwargs, **options)
  316. header = kwargs.pop('header')
  317. body = kwargs.pop('body')
  318. header, body = (maybe_signature(header, app=app),
  319. maybe_signature(body, app=app))
  320. # forward certain options to body
  321. if chord is not None:
  322. body.options['chord'] = chord
  323. if group_id is not None:
  324. body.options['group_id'] = group_id
  325. [body.link(s) for s in options.pop('link', [])]
  326. [body.link_error(s) for s in options.pop('link_error', [])]
  327. body_result = body.freeze(task_id)
  328. parent = super(Chord, self).apply_async((header, body, args),
  329. kwargs, **options)
  330. body_result.parent = parent
  331. return body_result
  332. def apply(self, args=(), kwargs={}, propagate=True, **options):
  333. body = kwargs['body']
  334. res = super(Chord, self).apply(args, dict(kwargs, eager=True),
  335. **options)
  336. return maybe_signature(body, app=self.app).apply(
  337. args=(res.get(propagate=propagate).get(), ))
  338. return Chord