Przeglądaj źródła

Retry sqlalchemy backend operations on DatabaseError/OperationalError. Closes #634

Ask Solem 13 lat temu
rodzic
commit
74410c9849
1 zmienionych plików z 29 dodań i 2 usunięć
  1. 29 2
      celery/backends/database/__init__.py

+ 29 - 2
celery/backends/database/__init__.py

@@ -1,6 +1,8 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
+from functools import wraps
+
 from celery import states
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.timeutils import maybe_timedelta
@@ -21,6 +23,24 @@ def _sqlalchemy_installed():
     return sqlalchemy
 _sqlalchemy_installed()
 
+from sqlalchemy.exc import DatabaseError, OperationalError
+
+
+def retry(fun):
+
+    @wraps(fun)
+    def _inner(*args, **kwargs):
+        max_retries = kwargs.pop("max_retries", 3)
+
+        for retries in xrange(max_retries + 1):
+            try:
+                return fun(*args, **kwargs)
+            except (DatabaseError, OperationalError):
+                if retries + 1 > max_retries:
+                    raise
+
+    return _inner
+
 
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
@@ -49,7 +69,9 @@ class DatabaseBackend(BaseDictBackend):
                     short_lived_sessions=self.short_lived_sessions,
                     **self.engine_options)
 
-    def _store_result(self, task_id, result, status, traceback=None):
+    @retry
+    def _store_result(self, task_id, result, status, traceback=None,
+            max_retries=3):
         """Store return value and status of an executed task."""
         session = self.ResultSession()
         try:
@@ -62,10 +84,11 @@ class DatabaseBackend(BaseDictBackend):
             task.status = status
             task.traceback = traceback
             session.commit()
+            return result
         finally:
             session.close()
-        return result
 
+    @retry
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
         session = self.ResultSession()
@@ -79,6 +102,7 @@ class DatabaseBackend(BaseDictBackend):
         finally:
             session.close()
 
+    @retry
     def _save_taskset(self, taskset_id, result):
         """Store the result of an executed taskset."""
         session = self.ResultSession()
@@ -91,6 +115,7 @@ class DatabaseBackend(BaseDictBackend):
         finally:
             session.close()
 
+    @retry
     def _restore_taskset(self, taskset_id):
         """Get metadata for taskset by id."""
         session = self.ResultSession()
@@ -102,6 +127,7 @@ class DatabaseBackend(BaseDictBackend):
         finally:
             session.close()
 
+    @retry
     def _delete_taskset(self, taskset_id):
         """Delete metadata for taskset by id."""
         session = self.ResultSession()
@@ -113,6 +139,7 @@ class DatabaseBackend(BaseDictBackend):
         finally:
             session.close()
 
+    @retry
     def _forget(self, task_id):
         """Forget about result."""
         session = self.ResultSession()