123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- # -*- coding: utf-8 -*-
- """
- celery.app.builtins
- ~~~~~~~~~~~~~~~~~~~
- Built-in tasks that are always available in all
- app instances. E.g. chord, group and xmap.
- """
- from __future__ import absolute_import
- from celery._state import get_current_worker_task, connect_on_app_finalize
- from celery.utils.log import get_logger
- __all__ = []
- logger = get_logger(__name__)
- @connect_on_app_finalize
- def add_backend_cleanup_task(app):
- """The backend cleanup task can be used to clean up the default result
- backend.
- If the configured backend requires periodic cleanup this task is also
- automatically configured to run every day at 4am (requires
- :program:`celery beat` to be running).
- """
- @app.task(name='celery.backend_cleanup', shared=False, lazy=False)
- def backend_cleanup():
- app.backend.cleanup()
- return backend_cleanup
- @connect_on_app_finalize
- def add_accumulate_task(app):
- """This task is used by Task.replace when replacing a task with
- a group, to "collect" results."""
- @app.task(bind=True, name='celery.accumulate', shared=False, lazy=False)
- def accumulate(self, *args, **kwargs):
- index = kwargs.get('index')
- return args[index] if index is not None else args
- @connect_on_app_finalize
- def add_unlock_chord_task(app):
- """This task is used by result backends without native chord support.
- It joins chords by creating a task chain polling the header for completion.
- """
- from celery.canvas import maybe_signature
- from celery.exceptions import ChordError
- from celery.result import allow_join_result, result_from_tuple
- default_propagate = app.conf.CELERY_CHORD_PROPAGATES
- @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
- default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
- def unlock_chord(self, group_id, callback, interval=None, propagate=None,
- max_retries=None, result=None,
- Result=app.AsyncResult, GroupResult=app.GroupResult,
- result_from_tuple=result_from_tuple):
- # if propagate is disabled exceptions raised by chord tasks
- # will be sent as part of the result list to the chord callback.
- # Since 3.1 propagate will be enabled by default, and instead
- # the chord callback changes state to FAILURE with the
- # exception set to ChordError.
- propagate = default_propagate if propagate is None else propagate
- if interval is None:
- interval = self.default_retry_delay
- # check if the task group is ready, and if so apply the callback.
- callback = maybe_signature(callback, app)
- deps = GroupResult(
- group_id,
- [result_from_tuple(r, app=app) for r in result],
- app=app,
- )
- j = deps.join_native if deps.supports_native_join else deps.join
- try:
- ready = deps.ready()
- except Exception as exc:
- raise self.retry(
- exc=exc, countdown=interval, max_retries=max_retries,
- )
- else:
- if not ready:
- raise self.retry(countdown=interval, max_retries=max_retries)
- callback = maybe_signature(callback, app=app)
- try:
- with allow_join_result():
- ret = j(timeout=3.0, propagate=propagate)
- except Exception as exc:
- try:
- culprit = next(deps._failed_join_report())
- reason = 'Dependency {0.id} raised {1!r}'.format(
- culprit, exc,
- )
- except StopIteration:
- reason = repr(exc)
- logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
- app.backend.chord_error_from_stack(callback,
- ChordError(reason))
- else:
- try:
- callback.delay(ret)
- except Exception as exc:
- logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
- app.backend.chord_error_from_stack(
- callback,
- exc=ChordError('Callback error: {0!r}'.format(exc)),
- )
- return unlock_chord
- @connect_on_app_finalize
- def add_map_task(app):
- from celery.canvas import signature
- @app.task(name='celery.map', shared=False, lazy=False)
- def xmap(task, it):
- task = signature(task, app=app).type
- return [task(item) for item in it]
- return xmap
- @connect_on_app_finalize
- def add_starmap_task(app):
- from celery.canvas import signature
- @app.task(name='celery.starmap', shared=False, lazy=False)
- def xstarmap(task, it):
- task = signature(task, app=app).type
- return [task(*item) for item in it]
- return xstarmap
- @connect_on_app_finalize
- def add_chunk_task(app):
- from celery.canvas import chunks as _chunks
- @app.task(name='celery.chunks', shared=False, lazy=False)
- def chunks(task, it, n):
- return _chunks.apply_chunks(task, it, n)
- return chunks
- @connect_on_app_finalize
- def add_group_task(app):
- """No longer used, but here for backwards compatibility."""
- from celery.canvas import maybe_signature
- from celery.result import result_from_tuple
- @app.task(name='celery.group', bind=True, shared=False, lazy=False)
- def group(self, tasks, result, group_id, partial_args, add_to_parent=True):
- app = self.app
- result = result_from_tuple(result, app)
- # any partial args are added to all tasks in the group
- taskit = (maybe_signature(task, app=app).clone(partial_args)
- for i, task in enumerate(tasks))
- with app.producer_or_acquire() as producer:
- [stask.apply_async(group_id=group_id, producer=producer,
- add_to_parent=False) for stask in taskit]
- parent = get_current_worker_task()
- if add_to_parent and parent:
- parent.add_trail(result)
- return result
- return group
- @connect_on_app_finalize
- def add_chain_task(app):
- """No longer used, but here for backwards compatibility."""
- @app.task(name='celery.chain', shared=False, lazy=False)
- def chain(*args, **kwargs):
- raise NotImplementedError('chain is not a real task')
- return chain
- @connect_on_app_finalize
- def add_chord_task(app):
- """No longer used, but here for backwards compatibility."""
- from celery import group, chord as _chord
- from celery.canvas import maybe_signature
- @app.task(name='celery.chord', bind=True, ignore_result=False,
- shared=False, lazy=False)
- def chord(self, header, body, partial_args=(), interval=None,
- countdown=1, max_retries=None, propagate=None,
- eager=False, **kwargs):
- app = self.app
- # - convert back to group if serialized
- tasks = header.tasks if isinstance(header, group) else header
- header = group([
- maybe_signature(s, app=app) for s in tasks
- ], app=self.app)
- body = maybe_signature(body, app=app)
- ch = _chord(header, body)
- return ch.run(header, body, partial_args, app, interval,
- countdown, max_retries, propagate, **kwargs)
- return chord
|