managers.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from django.db import models
  2. from celery.registry import tasks
  3. from datetime import datetime, timedelta
  4. __all__ = ["TaskManager", "PeriodicTaskManager"]
  5. class TaskManager(models.Manager):
  6. def get_task(self, task_id):
  7. task, created = self.get_or_create(task_id=task_id)
  8. return task
  9. def is_done(self, task_id):
  10. return self.get_task(task_id).is_done
  11. def get_all_expired(self):
  12. return self.filter(date_done__lt=datetime.now() - timedelta(days=5),
  13. is_done=True)
  14. def delete_expired(self):
  15. self.get_all_expired().delete()
  16. def mark_as_done(self, task_id):
  17. task, created = self.get_or_create(task_id=task_id, defaults={
  18. "is_done": True})
  19. if not created:
  20. task.is_done = True
  21. task.save()
  22. class PeriodicTaskManager(models.Manager):
  23. def get_waiting_tasks(self):
  24. periodic_tasks = tasks.get_all_periodic()
  25. waiting = []
  26. for task_name, task in periodic_tasks.items():
  27. task_meta, created = self.get_or_create(name=task_name)
  28. # task_run.every must be a timedelta object.
  29. run_at = task_meta.last_run_at + task.run_every
  30. if datetime.now() > run_at:
  31. waiting.append(task_meta)
  32. return waiting