瀏覽代碼

Fixed race condition in storing task task in DB

Jerzy Kozera 15 年之前
父節點
當前提交
0e4e001dd5
共有 1 個文件被更改,包括 27 次插入8 次删除
  1. 27 8
      celery/managers.py

+ 27 - 8
celery/managers.py

@@ -9,9 +9,24 @@ from celery.conf import TASK_RESULT_EXPIRES
 class TaskManager(models.Manager):
 class TaskManager(models.Manager):
     """Manager for :class:`celery.models.Task` models."""
     """Manager for :class:`celery.models.Task` models."""
 
 
-    def get_task(self, task_id):
-        """Get task meta for task by ``task_id``."""
-        task, created = self.get_or_create(task_id=task_id)
+    def get_task(self, task_id, exception_retry_count=1):
+        """Get task meta for task by ``task_id``.
+        
+        :keyword exception_retry_count: How many times to retry with
+            transaction rollback on exception. 1 by default: we assume
+            the pessimistic case when we get race condition in which
+            task is created by other process during get_or_create
+        """
+        try:
+            task, created = self.get_or_create(task_id=task_id)
+        except Exception, exc:
+            # depending on the database backend we can get various exceptions,
+            # so we catch every exception type
+            if exception_retry_count > 0:
+                transaction.rollback_unless_managed()
+                return self.get_task(task_id, exception_retry_count-1)
+            else:
+                raise
         return task
         return task
 
 
     def is_successful(self, task_id):
     def is_successful(self, task_id):
@@ -27,7 +42,7 @@ class TaskManager(models.Manager):
         self.get_all_expired().delete()
         self.get_all_expired().delete()
 
 
     def store_result(self, task_id, result, status, traceback=None,
     def store_result(self, task_id, result, status, traceback=None,
-            exception_retry=True):
+            exception_retry_count=2):
         """Store the result and status of a task.
         """Store the result and status of a task.
 
 
         :param task_id: task id
         :param task_id: task id
@@ -42,8 +57,12 @@ class TaskManager(models.Manager):
         :keyword traceback: The traceback at the point of exception (if the
         :keyword traceback: The traceback at the point of exception (if the
             task failed).
             task failed).
 
 
-        :keyword exception_retry: If True, we try a single retry with
-            transaction rollback on exception
+        :keyword exception_retry_count: How many times to retry with
+            transaction rollback on exception. 2 by default: we assume
+            the pessimistic case when task execution by itself could
+            leave broken transaction, and during second try we get
+            race condition in which task is created by other process
+            during get_or_create
         """
         """
         try:
         try:
             task, created = self.get_or_create(task_id=task_id, defaults={
             task, created = self.get_or_create(task_id=task_id, defaults={
@@ -60,8 +79,8 @@ class TaskManager(models.Manager):
             # for excample, psycopg2 raises an exception if some operation
             # for excample, psycopg2 raises an exception if some operation
             # breaks transaction, and saving task result won't be possible
             # breaks transaction, and saving task result won't be possible
             # until we rollback transaction
             # until we rollback transaction
-            if exception_retry:
+            if exception_retry_count > 0:
                 transaction.rollback_unless_managed()
                 transaction.rollback_unless_managed()
-                self.store_result(task_id, result, status, traceback, False)
+                self.store_result(task_id, result, status, traceback, exception_retry_count-1)
             else:
             else:
                 raise
                 raise