| 1234567891011121314151617181920212223242526272829 | """Decorators"""from celery.app import app_or_defaultfrom celery.task.base import PeriodicTaskdef task(*args, **kwargs):    return app_or_default().task(*args, **kwargs)def periodic_task(**options):    """Task decorator to create a periodic task.    Example task, scheduling a task once every day:    .. code-block:: python        from datetime import timedelta        @periodic_task(run_every=timedelta(days=1))        def cronjob(**kwargs):            logger = cronjob.get_logger(**kwargs)            logger.warn("Task running...")    """    return task(**dict({"base": PeriodicTask}, **options))
 |