managers.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """celery.managers"""
  2. from django.db import models
  3. from celery.registry import tasks
  4. from datetime import datetime, timedelta
  5. import random
  6. # server_drift can be negative, but timedelta supports addition on
  7. # negative seconds.
  8. SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
  9. class TaskManager(models.Manager):
  10. """Manager for :class:`celery.models.Task` models."""
  11. def get_task(self, task_id):
  12. """Get task meta for task by ``task_id``."""
  13. task, created = self.get_or_create(task_id=task_id)
  14. return task
  15. def is_done(self, task_id):
  16. """Returns ``True`` if the task was executed successfully."""
  17. return self.get_task(task_id).status == "DONE"
  18. def get_all_expired(self):
  19. """Get all expired task results."""
  20. # TODO Make the timedelta configurable
  21. return self.filter(date_done__lt=datetime.now() - timedelta(days=5))
  22. def delete_expired(self):
  23. """Delete all expired task results."""
  24. self.get_all_expired().delete()
  25. def store_result(self, task_id, result, status):
  26. """Store the result and status of a task.
  27. :param task_id: task id
  28. :param result: The return value of the task, or an exception
  29. instance raised by the task.
  30. :param status: Task status. See
  31. :meth:`celery.result.AsyncResult.get_status` for a list of
  32. possible status values.
  33. """
  34. task, created = self.get_or_create(task_id=task_id, defaults={
  35. "status": status,
  36. "result": result})
  37. if not created:
  38. task.status = status
  39. task.result = result
  40. task.save()
  41. class PeriodicTaskManager(models.Manager):
  42. """Manager for :class:`celery.models.PeriodicTask` models."""
  43. def get_waiting_tasks(self):
  44. """Get all waiting periodic tasks.
  45. :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
  46. """
  47. periodic_tasks = tasks.get_all_periodic()
  48. waiting = []
  49. for task_name, task in periodic_tasks.items():
  50. task_meta, created = self.get_or_create(name=task_name)
  51. # task_run.every must be a timedelta object.
  52. run_every_drifted = task.run_every + SERVER_DRIFT
  53. run_at = task_meta.last_run_at + task.run_every
  54. if datetime.now() > run_at:
  55. waiting.append(task_meta)
  56. return waiting