database.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from celery import states
  2. from celery.models import TaskMeta, TaskSetMeta
  3. from celery.backends.base import BaseDictBackend
  4. class DatabaseBackend(BaseDictBackend):
  5. """The database backends. Using Django models to store task metadata."""
  6. def _store_result(self, task_id, result, status, traceback=None):
  7. """Store return value and status of an executed task."""
  8. TaskMeta.objects.store_result(task_id, result, status,
  9. traceback=traceback)
  10. return result
  11. def _save_taskset(self, taskset_id, result):
  12. """Store the result of an executed taskset."""
  13. TaskSetMeta.objects.store_result(taskset_id, result)
  14. return result
  15. def _get_task_meta_for(self, task_id):
  16. """Get task metadata for a task by id."""
  17. if task_id in self._cache:
  18. return self._cache[task_id]
  19. meta = TaskMeta.objects.get_task(task_id)
  20. if meta:
  21. meta = meta.to_dict()
  22. if meta["status"] == states.SUCCESS:
  23. self._cache[task_id] = meta
  24. return meta
  25. def _restore_taskset(self, taskset_id):
  26. """Get taskset metadata for a taskset by id."""
  27. if taskset_id in self._cache:
  28. return self._cache[taskset_id]
  29. meta = TaskSetMeta.objects.restore_taskset(taskset_id)
  30. if meta:
  31. meta = self._cache[taskset_id] = meta.to_dict()
  32. return meta
  33. def cleanup(self):
  34. """Delete expired metadata."""
  35. TaskMeta.objects.delete_expired()
  36. TaskSetMeta.objects.delete_expired()