builtins.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.builtins
  4. ~~~~~~~~~~~~~~~~~~~
  5. Built-in tasks that are always available in all
  6. app instances. E.g. chord, group and xmap.
  7. """
  8. from __future__ import absolute_import
  9. from collections import deque
  10. from celery._state import get_current_worker_task
  11. from celery.utils import uuid
  12. #: global list of functions defining tasks that should be
  13. #: added to all apps.
  14. _shared_tasks = []
  15. def shared_task(constructor):
  16. """Decorator that specifies a function that generates a built-in task.
  17. The function will then be called for every new app instance created
  18. (lazily, so more exactly when the task registry for that app is needed).
  19. The function must take a single ``app`` argument.
  20. """
  21. _shared_tasks.append(constructor)
  22. return constructor
  23. def load_shared_tasks(app):
  24. """Create built-in tasks for an app instance."""
  25. for constructor in _shared_tasks:
  26. constructor(app)
  27. @shared_task
  28. def add_backend_cleanup_task(app):
  29. """The backend cleanup task can be used to clean up the default result
  30. backend.
  31. If the configured backend requires periodic cleanup this task is also
  32. automatically configured to run every day at midnight (requires
  33. :program:`celery beat` to be running).
  34. """
  35. @app.task(name='celery.backend_cleanup', _force_evaluate=True)
  36. def backend_cleanup():
  37. app.backend.cleanup()
  38. return backend_cleanup
  39. @shared_task
  40. def add_unlock_chord_task(app):
  41. """This task is used by result backends without native chord support.
  42. It joins chords by creating a task chain polling the header for completion.
  43. """
  44. from celery.canvas import subtask
  45. from celery.exceptions import ChordError
  46. from celery.result import from_serializable
  47. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  48. @app.task(name='celery.chord_unlock', max_retries=None,
  49. default_retry_delay=1, ignore_result=True, _force_evaluate=True)
  50. def unlock_chord(group_id, callback, interval=None, propagate=None,
  51. max_retries=None, result=None,
  52. Result=app.AsyncResult, GroupResult=app.GroupResult,
  53. from_serializable=from_serializable):
  54. # if propagate is disabled exceptions raised by chord tasks
  55. # will be sent as part of the result list to the chord callback.
  56. # Since 3.1 propagate will be enabled by default, and instead
  57. # the chord callback changes state to FAILURE with the
  58. # exception set to ChordError.
  59. propagate = default_propagate if propagate is None else propagate
  60. if interval is None:
  61. interval = unlock_chord.default_retry_delay
  62. # check if the task group is ready, and if so apply the callback.
  63. deps = GroupResult(
  64. group_id,
  65. [from_serializable(r, app=app) for r in result],
  66. )
  67. j = deps.join_native if deps.supports_native_join else deps.join
  68. if deps.ready():
  69. callback = subtask(callback)
  70. try:
  71. ret = j(propagate=propagate)
  72. except Exception as exc:
  73. try:
  74. culprit = next(deps._failed_join_report())
  75. reason = 'Dependency {0.id} raised {1!r}'.format(
  76. culprit, exc,
  77. )
  78. except StopIteration:
  79. reason = repr(exc)
  80. app._tasks[callback.task].backend.fail_from_current_stack(
  81. callback.id, exc=ChordError(reason),
  82. )
  83. else:
  84. try:
  85. callback.delay(ret)
  86. except Exception as exc:
  87. app._tasks[callback.task].backend.fail_from_current_stack(
  88. callback.id,
  89. exc=ChordError('Callback error: {0!r}'.format(exc)),
  90. )
  91. else:
  92. return unlock_chord.retry(countdown=interval,
  93. max_retries=max_retries)
  94. return unlock_chord
  95. @shared_task
  96. def add_map_task(app):
  97. from celery.canvas import subtask
  98. @app.task(name='celery.map', _force_evaluate=True)
  99. def xmap(task, it):
  100. task = subtask(task).type
  101. return [task(item) for item in it]
  102. return xmap
  103. @shared_task
  104. def add_starmap_task(app):
  105. from celery.canvas import subtask
  106. @app.task(name='celery.starmap', _force_evaluate=True)
  107. def xstarmap(task, it):
  108. task = subtask(task).type
  109. return [task(*item) for item in it]
  110. return xstarmap
  111. @shared_task
  112. def add_chunk_task(app):
  113. from celery.canvas import chunks as _chunks
  114. @app.task(name='celery.chunks', _force_evaluate=True)
  115. def chunks(task, it, n):
  116. return _chunks.apply_chunks(task, it, n)
  117. return chunks
  118. @shared_task
  119. def add_group_task(app):
  120. _app = app
  121. from celery.canvas import maybe_subtask, subtask
  122. from celery.result import from_serializable
  123. class Group(app.Task):
  124. app = _app
  125. name = 'celery.group'
  126. accept_magic_kwargs = False
  127. def run(self, tasks, result, group_id, partial_args):
  128. app = self.app
  129. result = from_serializable(result, app)
  130. # any partial args are added to all tasks in the group
  131. taskit = (subtask(task).clone(partial_args)
  132. for i, task in enumerate(tasks))
  133. if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
  134. return app.GroupResult(
  135. result.id,
  136. [stask.apply(group_id=group_id) for stask in taskit],
  137. )
  138. with app.producer_or_acquire() as pub:
  139. [stask.apply_async(group_id=group_id, publisher=pub,
  140. add_to_parent=False) for stask in taskit]
  141. parent = get_current_worker_task()
  142. if parent:
  143. parent.request.children.append(result)
  144. return result
  145. def prepare(self, options, tasks, args, **kwargs):
  146. AsyncResult = self.AsyncResult
  147. options['group_id'] = group_id = (
  148. options.setdefault('task_id', uuid()))
  149. def prepare_member(task):
  150. task = maybe_subtask(task)
  151. opts = task.options
  152. opts['group_id'] = group_id
  153. try:
  154. tid = opts['task_id']
  155. except KeyError:
  156. tid = opts['task_id'] = uuid()
  157. return task, AsyncResult(tid)
  158. try:
  159. tasks, res = list(zip(
  160. *[prepare_member(task) for task in tasks]
  161. ))
  162. except ValueError: # tasks empty
  163. tasks, res = [], []
  164. return (tasks, self.app.GroupResult(group_id, res), group_id, args)
  165. def apply_async(self, partial_args=(), kwargs={}, **options):
  166. if self.app.conf.CELERY_ALWAYS_EAGER:
  167. return self.apply(partial_args, kwargs, **options)
  168. tasks, result, gid, args = self.prepare(
  169. options, args=partial_args, **kwargs
  170. )
  171. super(Group, self).apply_async((
  172. list(tasks), result.serializable(), gid, args), **options
  173. )
  174. return result
  175. def apply(self, args=(), kwargs={}, **options):
  176. return super(Group, self).apply(
  177. self.prepare(options, args=args, **kwargs),
  178. **options).get()
  179. return Group
  180. @shared_task
  181. def add_chain_task(app):
  182. from celery.canvas import Signature, chord, group, maybe_subtask
  183. _app = app
  184. class Chain(app.Task):
  185. app = _app
  186. name = 'celery.chain'
  187. accept_magic_kwargs = False
  188. def prepare_steps(self, args, tasks):
  189. steps = deque(tasks)
  190. next_step = prev_task = prev_res = None
  191. tasks, results = [], []
  192. i = 0
  193. while steps:
  194. # First task get partial args from chain.
  195. task = maybe_subtask(steps.popleft())
  196. task = task.clone() if i else task.clone(args)
  197. res = task.freeze()
  198. i += 1
  199. if isinstance(task, group):
  200. # automatically upgrade group(..) | s to chord(group, s)
  201. try:
  202. next_step = steps.popleft()
  203. # for chords we freeze by pretending it's a normal
  204. # task instead of a group.
  205. res = Signature.freeze(task)
  206. task = chord(task, body=next_step, task_id=res.task_id)
  207. except IndexError:
  208. pass # no callback, so keep as group
  209. if prev_task:
  210. # link previous task to this task.
  211. prev_task.link(task)
  212. # set the results parent attribute.
  213. res.parent = prev_res
  214. if not isinstance(prev_task, chord):
  215. results.append(res)
  216. tasks.append(task)
  217. prev_task, prev_res = task, res
  218. return tasks, results
  219. def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
  220. task_id=None, link=None, link_error=None, **options):
  221. if self.app.conf.CELERY_ALWAYS_EAGER:
  222. return self.apply(args, kwargs, **options)
  223. options.pop('publisher', None)
  224. tasks, results = self.prepare_steps(args, kwargs['tasks'])
  225. result = results[-1]
  226. if group_id:
  227. tasks[-1].set(group_id=group_id)
  228. if chord:
  229. tasks[-1].set(chord=chord)
  230. if task_id:
  231. tasks[-1].set(task_id=task_id)
  232. result = tasks[-1].type.AsyncResult(task_id)
  233. # make sure we can do a link() and link_error() on a chain object.
  234. if link:
  235. tasks[-1].set(link=link)
  236. # and if any task in the chain fails, call the errbacks
  237. if link_error:
  238. for task in tasks:
  239. task.set(link_error=link_error)
  240. tasks[0].apply_async()
  241. return result
  242. def apply(self, args=(), kwargs={}, subtask=maybe_subtask, **options):
  243. last, fargs = None, args # fargs passed to first task only
  244. for task in kwargs['tasks']:
  245. res = subtask(task).clone(fargs).apply(last and (last.get(), ))
  246. res.parent, last, fargs = last, res, None
  247. return last
  248. return Chain
  249. @shared_task
  250. def add_chord_task(app):
  251. """Every chord is executed in a dedicated task, so that the chord
  252. can be used as a subtask, and this generates the task
  253. responsible for that."""
  254. from celery import group
  255. from celery.canvas import maybe_subtask
  256. _app = app
  257. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  258. class Chord(app.Task):
  259. app = _app
  260. name = 'celery.chord'
  261. accept_magic_kwargs = False
  262. ignore_result = False
  263. def run(self, header, body, partial_args=(), interval=None,
  264. countdown=1, max_retries=None, propagate=None,
  265. eager=False, **kwargs):
  266. propagate = default_propagate if propagate is None else propagate
  267. group_id = uuid()
  268. AsyncResult = self.app.AsyncResult
  269. prepare_member = self._prepare_member
  270. # - convert back to group if serialized
  271. tasks = header.tasks if isinstance(header, group) else header
  272. header = group([maybe_subtask(s).clone() for s in tasks])
  273. # - eager applies the group inline
  274. if eager:
  275. return header.apply(args=partial_args, task_id=group_id)
  276. results = [AsyncResult(prepare_member(task, body, group_id))
  277. for task in header.tasks]
  278. # - fallback implementations schedules the chord_unlock task here
  279. app.backend.on_chord_apply(group_id, body,
  280. interval=interval,
  281. countdown=countdown,
  282. max_retries=max_retries,
  283. propagate=propagate,
  284. result=results)
  285. # - call the header group, returning the GroupResult.
  286. return header(*partial_args, task_id=group_id)
  287. def _prepare_member(self, task, body, group_id):
  288. opts = task.options
  289. # d.setdefault would work but generating uuid's are expensive
  290. try:
  291. task_id = opts['task_id']
  292. except KeyError:
  293. task_id = opts['task_id'] = uuid()
  294. opts.update(chord=body, group_id=group_id)
  295. return task_id
  296. def apply_async(self, args=(), kwargs={}, task_id=None,
  297. group_id=None, chord=None, **options):
  298. if self.app.conf.CELERY_ALWAYS_EAGER:
  299. return self.apply(args, kwargs, **options)
  300. header = kwargs.pop('header')
  301. body = kwargs.pop('body')
  302. header, body = (list(maybe_subtask(header)),
  303. maybe_subtask(body))
  304. # forward certain options to body
  305. if chord is not None:
  306. body.options['chord'] = chord
  307. if group_id is not None:
  308. body.options['group_id'] = group_id
  309. [body.link(s) for s in options.pop('link', [])]
  310. [body.link_error(s) for s in options.pop('link_error', [])]
  311. body_result = body.freeze(task_id)
  312. parent = super(Chord, self).apply_async((header, body, args),
  313. kwargs, **options)
  314. body_result.parent = parent
  315. return body_result
  316. def apply(self, args=(), kwargs={}, propagate=True, **options):
  317. body = kwargs['body']
  318. res = super(Chord, self).apply(args, dict(kwargs, eager=True),
  319. **options)
  320. return maybe_subtask(body).apply(
  321. args=(res.get(propagate=propagate).get(), ))
  322. return Chord