123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- # -*- coding: utf-8 -*-
- """Built-in Tasks.
- The built-in tasks are always available in all app instances.
- """
- from __future__ import absolute_import, unicode_literals
- from celery._state import connect_on_app_finalize
- from celery.utils.log import get_logger
- __all__ = ()
- logger = get_logger(__name__)
- @connect_on_app_finalize
- def add_backend_cleanup_task(app):
- """Task used to clean up expired results.
- If the configured backend requires periodic cleanup this task is also
- automatically configured to run every day at 4am (requires
- :program:`celery beat` to be running).
- """
- @app.task(name='celery.backend_cleanup', shared=False, lazy=False)
- def backend_cleanup():
- app.backend.cleanup()
- return backend_cleanup
- @connect_on_app_finalize
- def add_accumulate_task(app):
- """Task used by Task.replace when replacing task with group."""
- @app.task(bind=True, name='celery.accumulate', shared=False, lazy=False)
- def accumulate(self, *args, **kwargs):
- index = kwargs.get('index')
- return args[index] if index is not None else args
- return accumulate
- @connect_on_app_finalize
- def add_unlock_chord_task(app):
- """Task used by result backends without native chord support.
- Will joins chord by creating a task chain polling the header
- for completion.
- """
- from celery.canvas import maybe_signature
- from celery.exceptions import ChordError
- from celery.result import allow_join_result, result_from_tuple
- @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
- default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
- def unlock_chord(self, group_id, callback, interval=None,
- max_retries=None, result=None,
- Result=app.AsyncResult, GroupResult=app.GroupResult,
- result_from_tuple=result_from_tuple, **kwargs):
- if interval is None:
- interval = self.default_retry_delay
- # check if the task group is ready, and if so apply the callback.
- callback = maybe_signature(callback, app)
- deps = GroupResult(
- group_id,
- [result_from_tuple(r, app=app) for r in result],
- app=app,
- )
- j = deps.join_native if deps.supports_native_join else deps.join
- try:
- ready = deps.ready()
- except Exception as exc:
- raise self.retry(
- exc=exc, countdown=interval, max_retries=max_retries,
- )
- else:
- if not ready:
- raise self.retry(countdown=interval, max_retries=max_retries)
- callback = maybe_signature(callback, app=app)
- try:
- with allow_join_result():
- ret = j(timeout=3.0, propagate=True)
- except Exception as exc: # pylint: disable=broad-except
- try:
- culprit = next(deps._failed_join_report())
- reason = 'Dependency {0.id} raised {1!r}'.format(culprit, exc)
- except StopIteration:
- reason = repr(exc)
- logger.exception('Chord %r raised: %r', group_id, exc)
- app.backend.chord_error_from_stack(callback, ChordError(reason))
- else:
- try:
- callback.delay(ret)
- except Exception as exc: # pylint: disable=broad-except
- logger.exception('Chord %r raised: %r', group_id, exc)
- app.backend.chord_error_from_stack(
- callback,
- exc=ChordError('Callback error: {0!r}'.format(exc)),
- )
- return unlock_chord
- @connect_on_app_finalize
- def add_map_task(app):
- from celery.canvas import signature
- @app.task(name='celery.map', shared=False, lazy=False)
- def xmap(task, it):
- task = signature(task, app=app).type
- return [task(item) for item in it]
- return xmap
- @connect_on_app_finalize
- def add_starmap_task(app):
- from celery.canvas import signature
- @app.task(name='celery.starmap', shared=False, lazy=False)
- def xstarmap(task, it):
- task = signature(task, app=app).type
- return [task(*item) for item in it]
- return xstarmap
- @connect_on_app_finalize
- def add_chunk_task(app):
- from celery.canvas import chunks as _chunks
- @app.task(name='celery.chunks', shared=False, lazy=False)
- def chunks(task, it, n):
- return _chunks.apply_chunks(task, it, n)
- return chunks
- @connect_on_app_finalize
- def add_group_task(app):
- """No longer used, but here for backwards compatibility."""
- from celery.canvas import maybe_signature
- from celery.result import result_from_tuple
- @app.task(name='celery.group', bind=True, shared=False, lazy=False)
- def group(self, tasks, result, group_id, partial_args, add_to_parent=True):
- app = self.app
- result = result_from_tuple(result, app)
- # any partial args are added to all tasks in the group
- taskit = (maybe_signature(task, app=app).clone(partial_args)
- for i, task in enumerate(tasks))
- with app.producer_or_acquire() as producer:
- [stask.apply_async(group_id=group_id, producer=producer,
- add_to_parent=False) for stask in taskit]
- parent = app.current_worker_task
- if add_to_parent and parent:
- parent.add_trail(result)
- return result
- return group
- @connect_on_app_finalize
- def add_chain_task(app):
- """No longer used, but here for backwards compatibility."""
- @app.task(name='celery.chain', shared=False, lazy=False)
- def chain(*args, **kwargs):
- raise NotImplementedError('chain is not a real task')
- return chain
- @connect_on_app_finalize
- def add_chord_task(app):
- """No longer used, but here for backwards compatibility."""
- from celery import group, chord as _chord
- from celery.canvas import maybe_signature
- @app.task(name='celery.chord', bind=True, ignore_result=False,
- shared=False, lazy=False)
- def chord(self, header, body, partial_args=(), interval=None,
- countdown=1, max_retries=None, eager=False, **kwargs):
- app = self.app
- # - convert back to group if serialized
- tasks = header.tasks if isinstance(header, group) else header
- header = group([
- maybe_signature(s, app=app) for s in tasks
- ], app=self.app)
- body = maybe_signature(body, app=app)
- ch = _chord(header, body)
- return ch.run(header, body, partial_args, app, interval,
- countdown, max_retries, **kwargs)
- return chord
|