Browse Source

Use a session manager that has different behavior before the fork (effectivelly hardcodes NullPool - everything else is unreliable).

Ionel Cristian Mărieș 11 years ago
parent
commit
6e8ab99dc9
2 changed files with 54 additions and 56 deletions
  1. 8 5
      celery/backends/database/__init__.py
  2. 46 51
      celery/backends/database/session.py

+ 8 - 5
celery/backends/database/__init__.py

@@ -20,7 +20,7 @@ from celery.utils.timeutils import maybe_timedelta
 
 from .models import Task
 from .models import TaskSet
-from .session import ResultSession
+from .session import SessionManager
 
 logger = logging.getLogger(__name__)
 
@@ -37,7 +37,7 @@ def _sqlalchemy_installed():
     return sqlalchemy
 _sqlalchemy_installed()
 
-from sqlalchemy.exc import DatabaseError, OperationalError, ResourceClosedError, InvalidRequestError
+from sqlalchemy.exc import DatabaseError, OperationalError, ResourceClosedError, InvalidRequestError, IntegrityError
 from sqlalchemy.orm.exc import StaleDataError
 
 
@@ -61,7 +61,10 @@ def retry(fun):
         for retries in range(max_retries):
             try:
                 return fun(*args, **kwargs)
-            except (DatabaseError, OperationalError, ResourceClosedError, StaleDataError, InvalidRequestError):
+            except (
+                DatabaseError, OperationalError, ResourceClosedError, StaleDataError, InvalidRequestError,
+                IntegrityError
+            ):
                 logger.warning(
                     "Failed operation %s. Retrying %s more times.",
                     fun.__name__, max_retries - retries - 1,
@@ -104,8 +107,8 @@ class DatabaseBackend(BaseBackend):
                 'Missing connection string! Do you have '
                 'CELERY_RESULT_DBURI set to a real value?')
 
-    def ResultSession(self):
-        return ResultSession(
+    def ResultSession(self, session_manager=SessionManager()):
+        return session_manager.session_factory(
             dburi=self.dburi,
             short_lived_sessions=self.short_lived_sessions,
             **self.engine_options

+ 46 - 51
celery/backends/database/session.py

@@ -8,60 +8,55 @@
 """
 from __future__ import absolute_import
 
-from collections import defaultdict
-from multiprocessing.util import register_after_fork
+from billiard.util import register_after_fork
 
 from sqlalchemy import create_engine
-from sqlalchemy.orm import sessionmaker
 from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.pool import NullPool
 
 ResultModelBase = declarative_base()
 
-_SETUP = defaultdict(lambda: False)
-_ENGINES = {}
-_SESSIONS = {}
-
-__all__ = ['ResultSession', 'get_engine', 'create_session']
-
-
-class _after_fork(object):
-    registered = False
-
-    def __call__(self):
-        self.registered = False  # child must reregister
-        for session in _SESSIONS:
-            session.close()
-        _SESSIONS.clear()
-        for engine in list(_ENGINES.values()):
-            engine.dispose()
-        _ENGINES.clear()
-after_fork = _after_fork()
-
-
-def get_engine(dburi, **kwargs):
-    try:
-        return _ENGINES[dburi]
-    except KeyError:
-        engine = _ENGINES[dburi] = create_engine(dburi, **kwargs)
-        after_fork.registered = True
-        register_after_fork(after_fork, after_fork)
-        return engine
-
-
-def create_session(dburi, short_lived_sessions=False, **kwargs):
-    engine = get_engine(dburi, **kwargs)
-    if short_lived_sessions or dburi not in _SESSIONS:
-        _SESSIONS[dburi] = sessionmaker(bind=engine)
-    return engine, _SESSIONS[dburi]
-
-
-def setup_results(engine):
-    if not _SETUP['results']:
-        ResultModelBase.metadata.create_all(engine)
-        _SETUP['results'] = True
-
-
-def ResultSession(dburi, **kwargs):
-    engine, session = create_session(dburi, **kwargs)
-    setup_results(engine)
-    return session()
+__all__ = ['SessionManager']
+
+
+class SessionManager(object):
+    def __init__(self):
+        self._engines = {}
+        self._sessions = {}
+        self.forked = False
+        self.prepared = False
+        register_after_fork(self, self._after_fork)
+
+    def _after_fork(self,):
+        self.forked = True
+
+    def get_engine(self, dburi, **kwargs):
+        if self.forked:
+            try:
+                return self._engines[dburi]
+            except KeyError:
+                engine = self._engines[dburi] = create_engine(dburi, **kwargs)
+                return engine
+        else:
+            kwargs['poolclass'] = NullPool
+            return create_engine(dburi, **kwargs)
+
+    def create_session(self, dburi, short_lived_sessions=False, **kwargs):
+        engine = self.get_engine(dburi, **kwargs)
+        if self.forked:
+            if short_lived_sessions or dburi not in self._sessions:
+                self._sessions[dburi] = sessionmaker(bind=engine)
+            return engine, self._sessions[dburi]
+        else:
+            return engine, sessionmaker(bind=engine)
+
+    def prepare_models(self, engine):
+        if not self.prepared:
+            ResultModelBase.metadata.create_all(engine)
+            self.prepared = True
+
+    def session_factory(self, dburi, **kwargs):
+        engine, session = self.create_session(dburi, **kwargs)
+        self.prepare_models(engine)
+        return session()