managers.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. """celery.managers"""
  2. from datetime import datetime
  3. from django.db import models
  4. from django.db import transaction
  5. from celery.conf import TASK_RESULT_EXPIRES
  6. class TaskManager(models.Manager):
  7. """Manager for :class:`celery.models.Task` models."""
  8. def get_task(self, task_id, exception_retry_count=1):
  9. """Get task meta for task by ``task_id``.
  10. :keyword exception_retry_count: How many times to retry with
  11. transaction rollback on exception. 1 by default: we assume
  12. the pessimistic case when we get race condition in which
  13. task is created by other process during get_or_create
  14. """
  15. try:
  16. task, created = self.get_or_create(task_id=task_id)
  17. except Exception, exc:
  18. # depending on the database backend we can get various exceptions,
  19. # so we catch every exception type
  20. if exception_retry_count > 0:
  21. transaction.rollback_unless_managed()
  22. return self.get_task(task_id, exception_retry_count-1)
  23. else:
  24. raise
  25. return task
  26. def is_successful(self, task_id):
  27. """Returns ``True`` if the task was executed successfully."""
  28. return self.get_task(task_id).status == "SUCCESS"
  29. def get_all_expired(self):
  30. """Get all expired task results."""
  31. return self.filter(date_done__lt=datetime.now() - TASK_RESULT_EXPIRES)
  32. def delete_expired(self):
  33. """Delete all expired task results."""
  34. self.get_all_expired().delete()
  35. def store_result(self, task_id, result, status, traceback=None,
  36. exception_retry_count=2):
  37. """Store the result and status of a task.
  38. :param task_id: task id
  39. :param result: The return value of the task, or an exception
  40. instance raised by the task.
  41. :param status: Task status. See
  42. :meth:`celery.result.AsyncResult.get_status` for a list of
  43. possible status values.
  44. :keyword traceback: The traceback at the point of exception (if the
  45. task failed).
  46. :keyword exception_retry_count: How many times to retry with
  47. transaction rollback on exception. 2 by default: we assume
  48. the pessimistic case when task execution by itself could
  49. leave broken transaction, and during second try we get
  50. race condition in which task is created by other process
  51. during get_or_create
  52. """
  53. try:
  54. task, created = self.get_or_create(task_id=task_id, defaults={
  55. "status": status,
  56. "result": result,
  57. "traceback": traceback})
  58. if not created:
  59. task.status = status
  60. task.result = result
  61. task.traceback = traceback
  62. task.save()
  63. except Exception, exc:
  64. # depending on the database backend we can get various exceptions.
  65. # for excample, psycopg2 raises an exception if some operation
  66. # breaks transaction, and saving task result won't be possible
  67. # until we rollback transaction
  68. if exception_retry_count > 0:
  69. transaction.rollback_unless_managed()
  70. self.store_result(task_id, result, status, traceback, exception_retry_count-1)
  71. else:
  72. raise