Browse Source

Try to rollback DB transaction on exception whensaving task result.

Jerzy Kozera 15 years ago
parent
commit
6f4d199501
1 changed files with 25 additions and 11 deletions
  1. 25 11
      celery/managers.py

+ 25 - 11
celery/managers.py

@@ -1,6 +1,6 @@
 """celery.managers"""
 from django.db import models
-from django.db import connection
+from django.db import connection, transaction
 from celery.registry import tasks
 from celery.conf import TASK_RESULT_EXPIRES
 from datetime import datetime, timedelta
@@ -80,7 +80,7 @@ class TaskManager(models.Manager):
         """Delete all expired task results."""
         self.get_all_expired().delete()
 
-    def store_result(self, task_id, result, status, traceback=None):
+    def store_result(self, task_id, result, status, traceback=None, exception_retry=True):
         """Store the result and status of a task.
 
         :param task_id: task id
@@ -95,16 +95,30 @@ class TaskManager(models.Manager):
         :keyword traceback: The traceback at the point of exception (if the
             task failed).
 
+        :keyword exception_retry: If we should retry storing by rollbacking 
+            transaction on exception
         """
-        task, created = self.get_or_create(task_id=task_id, defaults={
-                                            "status": status,
-                                            "result": result,
-                                            "traceback": traceback})
-        if not created:
-            task.status = status
-            task.result = result
-            task.traceback = traceback
-            task.save()
+        try:
+            task, created = self.get_or_create(task_id=task_id, defaults={
+                                                "status": status,
+                                                "result": result,
+                                                "traceback": traceback})
+            if not created:
+                task.status = status
+                task.result = result
+                task.traceback = traceback
+                task.save()
+        except:
+            # depending on the database backend we can get various exceptions.
+            # for excample, psycopg2 raises an exception if some operation
+            # breaks transaction, and saving task result won't be possible
+            # until we rollback transaction
+            if exception_retry:
+                transaction.rollback_unless_managed()
+                self.store_result(task_id, result, status, traceback, False)
+            else:
+                raise
+
 
 
 class PeriodicTaskManager(models.Manager):