database.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. """celery.backends.database"""
  2. from celery.models import TaskMeta, PeriodicTaskMeta
  3. from celery.backends.base import BaseBackend
  4. class Backend(BaseBackend):
  5. """The database backends. Using Django models to store task metadata."""
  6. capabilities = ["ResultStore", "PeriodicStatus"]
  7. def __init__(self, *args, **kwargs):
  8. super(Backend, self).__init__(*args, **kwargs)
  9. self._cache = {}
  10. def run_periodic_tasks(self):
  11. """Run all waiting periodic tasks."""
  12. waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
  13. for waiting_task in waiting_tasks:
  14. waiting_task.delay()
  15. def store_result(self, task_id, result, status):
  16. """Mark task as done (executed)."""
  17. if status == "DONE":
  18. result = self.prepare_result(result)
  19. elif status == "FAILURE":
  20. result = self.prepare_exception(result)
  21. TaskMeta.objects.store_result(task_id, result, status)
  22. return result
  23. def is_done(self, task_id):
  24. """Returns ``True`` if task with ``task_id`` has been executed."""
  25. return self.get_status(task_id) == "DONE"
  26. def get_status(self, task_id):
  27. """Get the status of a task."""
  28. return self._get_task_meta_for(task_id).status
  29. def get_result(self, task_id):
  30. """Get the result for a task."""
  31. meta = self._get_task_meta_for(task_id)
  32. if meta.status == "FAILURE":
  33. return self.exception_to_python(meta.result)
  34. else:
  35. return meta.result
  36. def _get_task_meta_for(self, task_id):
  37. """Get task metadata for a task by id."""
  38. if task_id in self._cache:
  39. return self._cache[task_id]
  40. meta = TaskMeta.objects.get_task(task_id)
  41. if meta.status == "DONE":
  42. self._cache[task_id] = meta
  43. return meta
  44. def cleanup(self):
  45. """Delete expired metadata."""
  46. TaskMeta.objects.delete_expired()