decorators.py 627 B

1234567891011121314151617181920212223242526272829
  1. """
  2. Decorators
  3. """
  4. from celery.app import app_or_default
  5. from celery.task.base import PeriodicTask
  6. def task(*args, **kwargs):
  7. return app_or_default().task(*args, **kwargs)
  8. def periodic_task(**options):
  9. """Task decorator to create a periodic task.
  10. Example task, scheduling a task once every day:
  11. .. code-block:: python
  12. from datetime import timedelta
  13. @periodic_task(run_every=timedelta(days=1))
  14. def cronjob(**kwargs):
  15. logger = cronjob.get_logger(**kwargs)
  16. logger.warn("Task running...")
  17. """
  18. return task(**dict({"base": PeriodicTask}, **options))