managers.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from datetime import datetime
  2. from itertools import count
  3. from billiard.utils.functional import wraps
  4. from django.db import models
  5. from django.db import transaction
  6. from django.db.models.query import QuerySet
  7. from celery import states
  8. def transaction_retry(max_retries=1):
  9. """Decorator for methods doing database operations.
  10. If the database operation fails, it will retry the operation
  11. at most ``max_retries`` times.
  12. """
  13. def _outer(fun):
  14. @wraps(fun)
  15. def _inner(*args, **kwargs):
  16. _max_retries = kwargs.pop("exception_retry_count", max_retries)
  17. for retries in count(0):
  18. try:
  19. return fun(*args, **kwargs)
  20. except Exception: # pragma: no cover
  21. # Depending on the database backend used we can experience
  22. # various exceptions. E.g. psycopg2 raises an exception
  23. # if some operation breaks the transaction, so saving
  24. # the task result won't be possible until we rollback
  25. # the transaction.
  26. if retries >= _max_retries:
  27. raise
  28. transaction.rollback_unless_managed()
  29. return _inner
  30. return _outer
  31. def update_model_with_dict(obj, fields):
  32. [setattr(obj, attr_name, attr_value)
  33. for attr_name, attr_value in fields.items()]
  34. obj.save()
  35. return obj
  36. class ExtendedQuerySet(QuerySet):
  37. def update_or_create(self, **kwargs):
  38. obj, created = self.get_or_create(**kwargs)
  39. if not created:
  40. fields = dict(kwargs.pop("defaults", {}))
  41. fields.update(kwargs)
  42. update_model_with_dict(obj, fields)
  43. return obj
  44. class ExtendedManager(models.Manager):
  45. def get_query_set(self):
  46. return ExtendedQuerySet(self.model)
  47. def update_or_create(self, **kwargs):
  48. return self.get_query_set().update_or_create(**kwargs)
  49. class ResultManager(ExtendedManager):
  50. def get_all_expired(self):
  51. """Get all expired task results."""
  52. from celery import conf
  53. expires = conf.TASK_RESULT_EXPIRES
  54. return self.filter(date_done__lt=datetime.now() - expires)
  55. def delete_expired(self):
  56. """Delete all expired taskset results."""
  57. self.get_all_expired().delete()
  58. class TaskManager(ResultManager):
  59. """Manager for :class:`celery.models.Task` models."""
  60. @transaction_retry(max_retries=1)
  61. def get_task(self, task_id):
  62. """Get task meta for task by ``task_id``.
  63. :keyword exception_retry_count: How many times to retry by
  64. transaction rollback on exception. This could theoretically
  65. happen in a race condition if another worker is trying to
  66. create the same task. The default is to retry once.
  67. """
  68. task, created = self.get_or_create(task_id=task_id)
  69. return task
  70. @transaction_retry(max_retries=2)
  71. def store_result(self, task_id, result, status, traceback=None):
  72. """Store the result and status of a task.
  73. :param task_id: task id
  74. :param result: The return value of the task, or an exception
  75. instance raised by the task.
  76. :param status: Task status. See
  77. :meth:`celery.result.AsyncResult.get_status` for a list of
  78. possible status values.
  79. :keyword traceback: The traceback at the point of exception (if the
  80. task failed).
  81. :keyword exception_retry_count: How many times to retry by
  82. transaction rollback on exception. This could theoretically
  83. happen in a race condition if another worker is trying to
  84. create the same task. The default is to retry twice.
  85. """
  86. return self.update_or_create(task_id=task_id, defaults={
  87. "status": status,
  88. "result": result,
  89. "traceback": traceback})
  90. class TaskSetManager(ResultManager):
  91. """Manager for :class:`celery.models.TaskSet` models."""
  92. @transaction_retry(max_retries=1)
  93. def restore_taskset(self, taskset_id):
  94. """Get taskset meta for task by ``taskset_id``."""
  95. try:
  96. return self.get(taskset_id=taskset_id)
  97. except self.model.DoesNotExist:
  98. return None
  99. @transaction_retry(max_retries=2)
  100. def store_result(self, taskset_id, result):
  101. """Store the result of a taskset.
  102. :param taskset_id: task set id
  103. :param result: The return value of the taskset
  104. """
  105. return self.update_or_create(taskset_id=taskset_id,
  106. defaults={"result": result})