database.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from celery import states
  2. from celery.models import TaskMeta, TaskSetMeta
  3. from celery.backends.base import BaseDictBackend
  4. class DatabaseBackend(BaseDictBackend):
  5. """The database backends. Using Django models to store task metadata."""
  6. def _store_result(self, task_id, result, status, traceback=None):
  7. """Store return value and status of an executed task."""
  8. TaskMeta.objects.store_result(task_id, result, status,
  9. traceback=traceback)
  10. return result
  11. def _save_taskset(self, taskset_id, result):
  12. """Store the result of an executed taskset."""
  13. TaskSetMeta.objects.store_result(taskset_id, result)
  14. return result
  15. def _get_task_meta_for(self, task_id):
  16. """Get task metadata for a task by id."""
  17. meta = TaskMeta.objects.get_task(task_id)
  18. if meta:
  19. meta = meta.to_dict()
  20. return meta
  21. def _restore_taskset(self, taskset_id):
  22. """Get taskset metadata for a taskset by id."""
  23. meta = TaskSetMeta.objects.restore_taskset(taskset_id)
  24. if meta:
  25. meta = meta.to_dict()
  26. return meta
  27. def cleanup(self):
  28. """Delete expired metadata."""
  29. TaskMeta.objects.delete_expired()
  30. TaskSetMeta.objects.delete_expired()