managers.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. """celery.managers"""
  2. from datetime import datetime
  3. from django.db import models
  4. from django.db import transaction
  5. from celery.conf import TASK_RESULT_EXPIRES
  6. class TaskManager(models.Manager):
  7. """Manager for :class:`celery.models.Task` models."""
  8. def get_task(self, task_id):
  9. """Get task meta for task by ``task_id``."""
  10. task, created = self.get_or_create(task_id=task_id)
  11. return task
  12. def is_successful(self, task_id):
  13. """Returns ``True`` if the task was executed successfully."""
  14. return self.get_task(task_id).status == "SUCCESS"
  15. def get_all_expired(self):
  16. """Get all expired task results."""
  17. return self.filter(date_done__lt=datetime.now() - TASK_RESULT_EXPIRES)
  18. def delete_expired(self):
  19. """Delete all expired task results."""
  20. self.get_all_expired().delete()
  21. def store_result(self, task_id, result, status, traceback=None,
  22. exception_retry=True):
  23. """Store the result and status of a task.
  24. :param task_id: task id
  25. :param result: The return value of the task, or an exception
  26. instance raised by the task.
  27. :param status: Task status. See
  28. :meth:`celery.result.AsyncResult.get_status` for a list of
  29. possible status values.
  30. :keyword traceback: The traceback at the point of exception (if the
  31. task failed).
  32. :keyword exception_retry: If True, we try a single retry with
  33. transaction rollback on exception
  34. """
  35. try:
  36. task, created = self.get_or_create(task_id=task_id, defaults={
  37. "status": status,
  38. "result": result,
  39. "traceback": traceback})
  40. if not created:
  41. task.status = status
  42. task.result = result
  43. task.traceback = traceback
  44. task.save()
  45. except Exception, exc:
  46. # depending on the database backend we can get various exceptions.
  47. # for excample, psycopg2 raises an exception if some operation
  48. # breaks transaction, and saving task result won't be possible
  49. # until we rollback transaction
  50. if exception_retry:
  51. transaction.rollback_unless_managed()
  52. self.store_result(task_id, result, status, traceback, False)
  53. else:
  54. raise