|
@@ -1,6 +1,6 @@
|
|
|
"""celery.managers"""
|
|
|
from django.db import models
|
|
|
-from django.db import connection
|
|
|
+from django.db import connection, transaction
|
|
|
from celery.registry import tasks
|
|
|
from celery.conf import TASK_RESULT_EXPIRES
|
|
|
from datetime import datetime, timedelta
|
|
@@ -80,7 +80,7 @@ class TaskManager(models.Manager):
|
|
|
"""Delete all expired task results."""
|
|
|
self.get_all_expired().delete()
|
|
|
|
|
|
- def store_result(self, task_id, result, status, traceback=None):
|
|
|
+ def store_result(self, task_id, result, status, traceback=None, exception_retry=True):
|
|
|
"""Store the result and status of a task.
|
|
|
|
|
|
:param task_id: task id
|
|
@@ -95,16 +95,30 @@ class TaskManager(models.Manager):
|
|
|
:keyword traceback: The traceback at the point of exception (if the
|
|
|
task failed).
|
|
|
|
|
|
+ :keyword exception_retry: If we should retry storing by rollbacking
|
|
|
+ transaction on exception
|
|
|
"""
|
|
|
- task, created = self.get_or_create(task_id=task_id, defaults={
|
|
|
- "status": status,
|
|
|
- "result": result,
|
|
|
- "traceback": traceback})
|
|
|
- if not created:
|
|
|
- task.status = status
|
|
|
- task.result = result
|
|
|
- task.traceback = traceback
|
|
|
- task.save()
|
|
|
+ try:
|
|
|
+ task, created = self.get_or_create(task_id=task_id, defaults={
|
|
|
+ "status": status,
|
|
|
+ "result": result,
|
|
|
+ "traceback": traceback})
|
|
|
+ if not created:
|
|
|
+ task.status = status
|
|
|
+ task.result = result
|
|
|
+ task.traceback = traceback
|
|
|
+ task.save()
|
|
|
+ except:
|
|
|
+ # depending on the database backend we can get various exceptions.
|
|
|
+ # for excample, psycopg2 raises an exception if some operation
|
|
|
+ # breaks transaction, and saving task result won't be possible
|
|
|
+ # until we rollback transaction
|
|
|
+ if exception_retry:
|
|
|
+ transaction.rollback_unless_managed()
|
|
|
+ self.store_result(task_id, result, status, traceback, False)
|
|
|
+ else:
|
|
|
+ raise
|
|
|
+
|
|
|
|
|
|
|
|
|
class PeriodicTaskManager(models.Manager):
|