builtins.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. # -*- coding: utf-8 -*-
  2. """Built-in Tasks.
  3. The built-in tasks are always available in all app instances.
  4. """
  5. from __future__ import absolute_import, unicode_literals
  6. from celery._state import connect_on_app_finalize
  7. from celery.utils.log import get_logger
  8. __all__ = ()
  9. logger = get_logger(__name__)
  10. @connect_on_app_finalize
  11. def add_backend_cleanup_task(app):
  12. """Task used to clean up expired results.
  13. If the configured backend requires periodic cleanup this task is also
  14. automatically configured to run every day at 4am (requires
  15. :program:`celery beat` to be running).
  16. """
  17. @app.task(name='celery.backend_cleanup', shared=False, lazy=False)
  18. def backend_cleanup():
  19. app.backend.cleanup()
  20. return backend_cleanup
  21. @connect_on_app_finalize
  22. def add_accumulate_task(app):
  23. """Task used by Task.replace when replacing task with group."""
  24. @app.task(bind=True, name='celery.accumulate', shared=False, lazy=False)
  25. def accumulate(self, *args, **kwargs):
  26. index = kwargs.get('index')
  27. return args[index] if index is not None else args
  28. return accumulate
  29. @connect_on_app_finalize
  30. def add_unlock_chord_task(app):
  31. """Task used by result backends without native chord support.
  32. Will joins chord by creating a task chain polling the header
  33. for completion.
  34. """
  35. from celery.canvas import maybe_signature
  36. from celery.exceptions import ChordError
  37. from celery.result import allow_join_result, result_from_tuple
  38. @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
  39. default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
  40. def unlock_chord(self, group_id, callback, interval=None,
  41. max_retries=None, result=None,
  42. Result=app.AsyncResult, GroupResult=app.GroupResult,
  43. result_from_tuple=result_from_tuple, **kwargs):
  44. if interval is None:
  45. interval = self.default_retry_delay
  46. # check if the task group is ready, and if so apply the callback.
  47. callback = maybe_signature(callback, app)
  48. deps = GroupResult(
  49. group_id,
  50. [result_from_tuple(r, app=app) for r in result],
  51. app=app,
  52. )
  53. j = deps.join_native if deps.supports_native_join else deps.join
  54. try:
  55. ready = deps.ready()
  56. except Exception as exc:
  57. raise self.retry(
  58. exc=exc, countdown=interval, max_retries=max_retries,
  59. )
  60. else:
  61. if not ready:
  62. raise self.retry(countdown=interval, max_retries=max_retries)
  63. callback = maybe_signature(callback, app=app)
  64. try:
  65. with allow_join_result():
  66. ret = j(timeout=3.0, propagate=True)
  67. except Exception as exc: # pylint: disable=broad-except
  68. try:
  69. culprit = next(deps._failed_join_report())
  70. reason = 'Dependency {0.id} raised {1!r}'.format(culprit, exc)
  71. except StopIteration:
  72. reason = repr(exc)
  73. logger.exception('Chord %r raised: %r', group_id, exc)
  74. app.backend.chord_error_from_stack(callback, ChordError(reason))
  75. else:
  76. try:
  77. callback.delay(ret)
  78. except Exception as exc: # pylint: disable=broad-except
  79. logger.exception('Chord %r raised: %r', group_id, exc)
  80. app.backend.chord_error_from_stack(
  81. callback,
  82. exc=ChordError('Callback error: {0!r}'.format(exc)),
  83. )
  84. return unlock_chord
  85. @connect_on_app_finalize
  86. def add_map_task(app):
  87. from celery.canvas import signature
  88. @app.task(name='celery.map', shared=False, lazy=False)
  89. def xmap(task, it):
  90. task = signature(task, app=app).type
  91. return [task(item) for item in it]
  92. return xmap
  93. @connect_on_app_finalize
  94. def add_starmap_task(app):
  95. from celery.canvas import signature
  96. @app.task(name='celery.starmap', shared=False, lazy=False)
  97. def xstarmap(task, it):
  98. task = signature(task, app=app).type
  99. return [task(*item) for item in it]
  100. return xstarmap
  101. @connect_on_app_finalize
  102. def add_chunk_task(app):
  103. from celery.canvas import chunks as _chunks
  104. @app.task(name='celery.chunks', shared=False, lazy=False)
  105. def chunks(task, it, n):
  106. return _chunks.apply_chunks(task, it, n)
  107. return chunks
  108. @connect_on_app_finalize
  109. def add_group_task(app):
  110. """No longer used, but here for backwards compatibility."""
  111. from celery.canvas import maybe_signature
  112. from celery.result import result_from_tuple
  113. @app.task(name='celery.group', bind=True, shared=False, lazy=False)
  114. def group(self, tasks, result, group_id, partial_args, add_to_parent=True):
  115. app = self.app
  116. result = result_from_tuple(result, app)
  117. # any partial args are added to all tasks in the group
  118. taskit = (maybe_signature(task, app=app).clone(partial_args)
  119. for i, task in enumerate(tasks))
  120. with app.producer_or_acquire() as producer:
  121. [stask.apply_async(group_id=group_id, producer=producer,
  122. add_to_parent=False) for stask in taskit]
  123. parent = app.current_worker_task
  124. if add_to_parent and parent:
  125. parent.add_trail(result)
  126. return result
  127. return group
  128. @connect_on_app_finalize
  129. def add_chain_task(app):
  130. """No longer used, but here for backwards compatibility."""
  131. @app.task(name='celery.chain', shared=False, lazy=False)
  132. def chain(*args, **kwargs):
  133. raise NotImplementedError('chain is not a real task')
  134. return chain
  135. @connect_on_app_finalize
  136. def add_chord_task(app):
  137. """No longer used, but here for backwards compatibility."""
  138. from celery import group, chord as _chord
  139. from celery.canvas import maybe_signature
  140. @app.task(name='celery.chord', bind=True, ignore_result=False,
  141. shared=False, lazy=False)
  142. def chord(self, header, body, partial_args=(), interval=None,
  143. countdown=1, max_retries=None, eager=False, **kwargs):
  144. app = self.app
  145. # - convert back to group if serialized
  146. tasks = header.tasks if isinstance(header, group) else header
  147. header = group([
  148. maybe_signature(s, app=app) for s in tasks
  149. ], app=self.app)
  150. body = maybe_signature(body, app=app)
  151. ch = _chord(header, body)
  152. return ch.run(header, body, partial_args, app, interval,
  153. countdown, max_retries, **kwargs)
  154. return chord