django.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. import warnings
  5. from kombu.utils import symbol_by_name
  6. from datetime import datetime
  7. from importlib import import_module
  8. from celery import signals
  9. from celery.exceptions import FixupWarning
  10. __all__ = ['DjangoFixup', 'fixup']
  11. ERR_NOT_INSTALLED = """\
  12. Environment variable DJANGO_SETTINGS_MODULE is defined
  13. but Django is not installed. Will not apply Django fixups!
  14. """
  15. def _maybe_close_fd(fh):
  16. try:
  17. os.close(fh.fileno())
  18. except (AttributeError, OSError, TypeError):
  19. # TypeError added for celery#962
  20. pass
  21. def fixup(app, env='DJANGO_SETTINGS_MODULE'):
  22. SETTINGS_MODULE = os.environ.get(env)
  23. if SETTINGS_MODULE and 'django' not in app.loader_cls.lower():
  24. try:
  25. import django # noqa
  26. except ImportError:
  27. warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
  28. else:
  29. return DjangoFixup(app).install()
  30. class DjangoFixup(object):
  31. _db_recycles = 0
  32. def __init__(self, app):
  33. self.app = app
  34. self.app.set_default()
  35. self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
  36. self._db = import_module('django.db')
  37. self._cache = import_module('django.core.cache')
  38. self._settings = symbol_by_name('django.conf:settings')
  39. self._mail_admins = symbol_by_name('django.core.mail:mail_admins')
  40. # Current time and date
  41. try:
  42. self._now = symbol_by_name('django.utils.timezone:now')
  43. except ImportError: # pre django-1.4
  44. self._now = datetime.now # noqa
  45. # Database-related exceptions.
  46. DatabaseError = symbol_by_name('django.db:DatabaseError')
  47. try:
  48. import MySQLdb as mysql
  49. _my_database_errors = (mysql.DatabaseError,
  50. mysql.InterfaceError,
  51. mysql.OperationalError)
  52. except ImportError:
  53. _my_database_errors = () # noqa
  54. try:
  55. import psycopg2 as pg
  56. _pg_database_errors = (pg.DatabaseError,
  57. pg.InterfaceError,
  58. pg.OperationalError)
  59. except ImportError:
  60. _pg_database_errors = () # noqa
  61. try:
  62. import sqlite3
  63. _lite_database_errors = (sqlite3.DatabaseError,
  64. sqlite3.InterfaceError,
  65. sqlite3.OperationalError)
  66. except ImportError:
  67. _lite_database_errors = () # noqa
  68. try:
  69. import cx_Oracle as oracle
  70. _oracle_database_errors = (oracle.DatabaseError,
  71. oracle.InterfaceError,
  72. oracle.OperationalError)
  73. except ImportError:
  74. _oracle_database_errors = () # noqa
  75. try:
  76. self._close_old_connections = symbol_by_name(
  77. 'django.db:close_old_connections',
  78. )
  79. except (ImportError, AttributeError):
  80. self._close_old_connections = None
  81. self.database_errors = (
  82. (DatabaseError, ) +
  83. _my_database_errors +
  84. _pg_database_errors +
  85. _lite_database_errors +
  86. _oracle_database_errors
  87. )
  88. def install(self):
  89. # Need to add project directory to path
  90. sys.path.append(os.getcwd())
  91. signals.beat_embedded_init.connect(self.close_database)
  92. signals.worker_ready.connect(self.on_worker_ready)
  93. signals.task_prerun.connect(self.on_task_prerun)
  94. signals.task_postrun.connect(self.on_task_postrun)
  95. signals.worker_init.connect(self.on_worker_init)
  96. signals.worker_process_init.connect(self.on_worker_process_init)
  97. self.app.loader.now = self.now
  98. self.app.loader.mail_admins = self.mail_admins
  99. return self
  100. def now(self, utc=False):
  101. return datetime.utcnow() if utc else self._now()
  102. def mail_admins(self, subject, body, fail_silently=False, **kwargs):
  103. return self._mail_admins(subject, body, fail_silently=fail_silently)
  104. def on_worker_init(self, **kwargs):
  105. """Called when the worker starts.
  106. Automatically discovers any ``tasks.py`` files in the applications
  107. listed in ``INSTALLED_APPS``.
  108. """
  109. self.close_database()
  110. self.close_cache()
  111. def on_worker_process_init(self, **kwargs):
  112. # the parent process may have established these,
  113. # so need to close them.
  114. # calling db.close() on some DB connections will cause
  115. # the inherited DB conn to also get broken in the parent
  116. # process so we need to remove it without triggering any
  117. # network IO that close() might cause.
  118. try:
  119. for c in self._db.connections.all():
  120. if c and c.connection:
  121. _maybe_close_fd(c.connection)
  122. except AttributeError:
  123. if self._db.connection and self._db.connection.connection:
  124. _maybe_close_fd(self._db.connection.connection)
  125. # use the _ version to avoid DB_REUSE preventing the conn.close() call
  126. self._close_database()
  127. self.close_cache()
  128. def on_task_prerun(self, sender, **kwargs):
  129. """Called before every task."""
  130. if not getattr(sender.request, 'is_eager', False):
  131. self.close_database()
  132. def on_task_postrun(self, sender, **kwargs):
  133. # See http://groups.google.com/group/django-users/
  134. # browse_thread/thread/78200863d0c07c6d/
  135. if not getattr(sender.request, 'is_eager', False):
  136. self.close_database()
  137. self.close_cache()
  138. def close_database(self, **kwargs):
  139. if self._close_old_connections:
  140. return self._close_old_connections() # Django 1.6
  141. if not self.db_reuse_max:
  142. return self._close_database()
  143. if self._db_recycles >= self.db_reuse_max * 2:
  144. self._db_recycles = 0
  145. self._close_database()
  146. self._db_recycles += 1
  147. def _close_database(self):
  148. try:
  149. funs = [conn.close for conn in self._db.connections]
  150. except AttributeError:
  151. if hasattr(self._db, 'close_old_connections'): # django 1.6
  152. funs = [self._db.close_old_connections]
  153. else:
  154. # pre multidb, pending deprication in django 1.6
  155. funs = [self._db.close_connection]
  156. for close in funs:
  157. try:
  158. close()
  159. except self.database_errors as exc:
  160. str_exc = str(exc)
  161. if 'closed' not in str_exc and 'not connected' not in str_exc:
  162. raise
  163. def close_cache(self):
  164. try:
  165. self._cache.cache.close()
  166. except (TypeError, AttributeError):
  167. pass
  168. def on_worker_ready(self, **kwargs):
  169. if self._settings.DEBUG:
  170. warnings.warn('Using settings.DEBUG leads to a memory leak, never '
  171. 'use this setting in production environments!')