session.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. """SQLAlchemy session."""
  3. from __future__ import absolute_import, unicode_literals
  4. from kombu.utils.compat import register_after_fork
  5. from sqlalchemy import create_engine
  6. from sqlalchemy.ext.declarative import declarative_base
  7. from sqlalchemy.orm import sessionmaker
  8. from sqlalchemy.pool import NullPool
  9. ResultModelBase = declarative_base()
  10. __all__ = ('SessionManager',)
  11. def _after_fork_cleanup_session(session):
  12. session._after_fork()
  13. class SessionManager(object):
  14. """Manage SQLAlchemy sessions."""
  15. def __init__(self):
  16. self._engines = {}
  17. self._sessions = {}
  18. self.forked = False
  19. self.prepared = False
  20. if register_after_fork is not None:
  21. register_after_fork(self, _after_fork_cleanup_session)
  22. def _after_fork(self):
  23. self.forked = True
  24. def get_engine(self, dburi, **kwargs):
  25. if self.forked:
  26. try:
  27. return self._engines[dburi]
  28. except KeyError:
  29. engine = self._engines[dburi] = create_engine(dburi, **kwargs)
  30. return engine
  31. else:
  32. return create_engine(dburi, poolclass=NullPool)
  33. def create_session(self, dburi, short_lived_sessions=False, **kwargs):
  34. engine = self.get_engine(dburi, **kwargs)
  35. if self.forked:
  36. if short_lived_sessions or dburi not in self._sessions:
  37. self._sessions[dburi] = sessionmaker(bind=engine)
  38. return engine, self._sessions[dburi]
  39. else:
  40. return engine, sessionmaker(bind=engine)
  41. def prepare_models(self, engine):
  42. if not self.prepared:
  43. ResultModelBase.metadata.create_all(engine)
  44. self.prepared = True
  45. def session_factory(self, dburi, **kwargs):
  46. engine, session = self.create_session(dburi, **kwargs)
  47. self.prepare_models(engine)
  48. return session()