managers.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from django.db import models
  2. from celery.registry import tasks
  3. from datetime import datetime, timedelta
  4. __all__ = ["TaskManager", "PeriodicTaskManager"]
  5. class TaskManager(models.Manager):
  6. def get_task(self, task_id):
  7. task, created = self.get_or_create(task_id=task_id)
  8. return task
  9. def is_done(self, task_id):
  10. return self.get_task(task_id).is_done
  11. def get_all_expired(self):
  12. return self.filter(date_done__lt=datetime.now() - timedelta(days=5))
  13. def delete_expired(self):
  14. self.get_all_expired().delete()
  15. def mark_as_done(self, task_id):
  16. task, created = self.get_or_create(task_id=task_id, defaults={
  17. "is_done": True})
  18. if not created:
  19. task.is_done = True
  20. task.save()
  21. class PeriodicTaskManager(models.Manager):
  22. def get_waiting_tasks(self):
  23. periodic_tasks = tasks.get_all_periodic()
  24. waiting = []
  25. for task_name, task in periodic_tasks.items():
  26. task_meta, created = self.get_or_create(name=task_name)
  27. # task_run.every must be a timedelta object.
  28. run_at = task_meta.last_run_at + task.run_every
  29. if datetime.now() > run_at:
  30. waiting.append(task_meta)
  31. return waiting