managers.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from datetime import datetime
  2. from itertools import count
  3. from billiard.utils.functional import wraps
  4. from django.db import models
  5. from django.db import transaction
  6. from django.db.models.query import QuerySet
  7. def transaction_retry(max_retries=1):
  8. """Decorator for methods doing database operations.
  9. If the database operation fails, it will retry the operation
  10. at most ``max_retries`` times.
  11. """
  12. def _outer(fun):
  13. @wraps(fun)
  14. def _inner(*args, **kwargs):
  15. _max_retries = kwargs.pop("exception_retry_count", max_retries)
  16. for retries in count(0):
  17. try:
  18. return fun(*args, **kwargs)
  19. except Exception: # pragma: no cover
  20. # Depending on the database backend used we can experience
  21. # various exceptions. E.g. psycopg2 raises an exception
  22. # if some operation breaks the transaction, so saving
  23. # the task result won't be possible until we rollback
  24. # the transaction.
  25. if retries >= _max_retries:
  26. raise
  27. transaction.rollback_unless_managed()
  28. return _inner
  29. return _outer
  30. def update_model_with_dict(obj, fields):
  31. [setattr(obj, attr_name, attr_value)
  32. for attr_name, attr_value in fields.items()]
  33. obj.save()
  34. return obj
  35. class ExtendedQuerySet(QuerySet):
  36. def update_or_create(self, **kwargs):
  37. obj, created = self.get_or_create(**kwargs)
  38. if not created:
  39. fields = dict(kwargs.pop("defaults", {}))
  40. fields.update(kwargs)
  41. update_model_with_dict(obj, fields)
  42. return obj
  43. class ExtendedManager(models.Manager):
  44. def get_query_set(self):
  45. return ExtendedQuerySet(self.model)
  46. def update_or_create(self, **kwargs):
  47. return self.get_query_set().update_or_create(**kwargs)
  48. class ResultManager(ExtendedManager):
  49. def get_all_expired(self):
  50. """Get all expired task results."""
  51. from celery import conf
  52. expires = conf.TASK_RESULT_EXPIRES
  53. return self.filter(date_done__lt=datetime.now() - expires)
  54. def delete_expired(self):
  55. """Delete all expired taskset results."""
  56. self.get_all_expired().delete()
  57. class TaskManager(ResultManager):
  58. """Manager for :class:`celery.models.Task` models."""
  59. @transaction_retry(max_retries=1)
  60. def get_task(self, task_id):
  61. """Get task meta for task by ``task_id``.
  62. :keyword exception_retry_count: How many times to retry by
  63. transaction rollback on exception. This could theoretically
  64. happen in a race condition if another worker is trying to
  65. create the same task. The default is to retry once.
  66. """
  67. task, created = self.get_or_create(task_id=task_id)
  68. return task
  69. @transaction_retry(max_retries=2)
  70. def store_result(self, task_id, result, status, traceback=None):
  71. """Store the result and status of a task.
  72. :param task_id: task id
  73. :param result: The return value of the task, or an exception
  74. instance raised by the task.
  75. :param status: Task status. See
  76. :meth:`celery.result.AsyncResult.get_status` for a list of
  77. possible status values.
  78. :keyword traceback: The traceback at the point of exception (if the
  79. task failed).
  80. :keyword exception_retry_count: How many times to retry by
  81. transaction rollback on exception. This could theoretically
  82. happen in a race condition if another worker is trying to
  83. create the same task. The default is to retry twice.
  84. """
  85. return self.update_or_create(task_id=task_id, defaults={
  86. "status": status,
  87. "result": result,
  88. "traceback": traceback})
  89. class TaskSetManager(ResultManager):
  90. """Manager for :class:`celery.models.TaskSet` models."""
  91. @transaction_retry(max_retries=1)
  92. def restore_taskset(self, taskset_id):
  93. """Get taskset meta for task by ``taskset_id``."""
  94. try:
  95. return self.get(taskset_id=taskset_id)
  96. except self.model.DoesNotExist:
  97. return None
  98. @transaction_retry(max_retries=2)
  99. def store_result(self, taskset_id, result):
  100. """Store the result of a taskset.
  101. :param taskset_id: task set id
  102. :param result: The return value of the taskset
  103. """
  104. return self.update_or_create(taskset_id=taskset_id,
  105. defaults={"result": result})