django.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. import warnings
  5. from kombu.utils import cached_property, symbol_by_name
  6. from datetime import datetime
  7. from importlib import import_module
  8. from celery import signals
  9. from celery.app import default_app
  10. from celery.exceptions import FixupWarning
  11. if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
  12. from StringIO import StringIO
  13. else: # pragma: no cover
  14. from io import StringIO
  15. __all__ = ['DjangoFixup', 'fixup']
  16. ERR_NOT_INSTALLED = """\
  17. Environment variable DJANGO_SETTINGS_MODULE is defined
  18. but Django is not installed. Will not apply Django fixups!
  19. """
  20. def _maybe_close_fd(fh):
  21. try:
  22. os.close(fh.fileno())
  23. except (AttributeError, OSError, TypeError):
  24. # TypeError added for celery#962
  25. pass
  26. def fixup(app, env='DJANGO_SETTINGS_MODULE'):
  27. SETTINGS_MODULE = os.environ.get(env)
  28. if SETTINGS_MODULE and 'django' not in app.loader_cls.lower():
  29. try:
  30. import django # noqa
  31. except ImportError:
  32. warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
  33. else:
  34. return DjangoFixup(app).install()
  35. class DjangoFixup(object):
  36. def __init__(self, app):
  37. self.app = app
  38. if default_app is None:
  39. self.app.set_default()
  40. self._worker_fixup = None
  41. def install(self):
  42. # Need to add project directory to path
  43. sys.path.append(os.getcwd())
  44. self._settings = symbol_by_name('django.conf:settings')
  45. self.app.loader.now = self.now
  46. self.app.loader.mail_admins = self.mail_admins
  47. signals.import_modules.connect(self.on_import_modules)
  48. signals.worker_init.connect(self.on_worker_init)
  49. return self
  50. @property
  51. def worker_fixup(self):
  52. if self._worker_fixup is None:
  53. self._worker_fixup = DjangoWorkerFixup(self.app)
  54. return self._worker_fixup
  55. @worker_fixup.setter
  56. def worker_fixup(self, value):
  57. self._worker_fixup = value
  58. def on_import_modules(self, **kwargs):
  59. # call django.setup() before task modules are imported
  60. self.worker_fixup.validate_models()
  61. def on_worker_init(self, **kwargs):
  62. self.worker_fixup.install()
  63. def now(self, utc=False):
  64. return datetime.utcnow() if utc else self._now()
  65. def mail_admins(self, subject, body, fail_silently=False, **kwargs):
  66. return self._mail_admins(subject, body, fail_silently=fail_silently)
  67. def autodiscover_tasks(self):
  68. try:
  69. from django.apps import apps
  70. except ImportError:
  71. return self._settings.INSTALLED_APPS
  72. else:
  73. return [config.name for config in apps.get_app_configs()]
  74. @cached_property
  75. def _mail_admins(self):
  76. return symbol_by_name('django.core.mail:mail_admins')
  77. @cached_property
  78. def _now(self):
  79. try:
  80. return symbol_by_name('django.utils.timezone:now')
  81. except (AttributeError, ImportError): # pre django-1.4
  82. return datetime.now
  83. class DjangoWorkerFixup(object):
  84. _db_recycles = 0
  85. def __init__(self, app):
  86. self.app = app
  87. self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
  88. self._db = import_module('django.db')
  89. self._cache = import_module('django.core.cache')
  90. self._settings = symbol_by_name('django.conf:settings')
  91. # Database-related exceptions.
  92. DatabaseError = symbol_by_name('django.db:DatabaseError')
  93. try:
  94. import MySQLdb as mysql
  95. _my_database_errors = (mysql.DatabaseError,
  96. mysql.InterfaceError,
  97. mysql.OperationalError)
  98. except ImportError:
  99. _my_database_errors = () # noqa
  100. try:
  101. import psycopg2 as pg
  102. _pg_database_errors = (pg.DatabaseError,
  103. pg.InterfaceError,
  104. pg.OperationalError)
  105. except ImportError:
  106. _pg_database_errors = () # noqa
  107. try:
  108. import sqlite3
  109. _lite_database_errors = (sqlite3.DatabaseError,
  110. sqlite3.InterfaceError,
  111. sqlite3.OperationalError)
  112. except ImportError:
  113. _lite_database_errors = () # noqa
  114. try:
  115. import cx_Oracle as oracle
  116. _oracle_database_errors = (oracle.DatabaseError,
  117. oracle.InterfaceError,
  118. oracle.OperationalError)
  119. except ImportError:
  120. _oracle_database_errors = () # noqa
  121. try:
  122. self._close_old_connections = symbol_by_name(
  123. 'django.db:close_old_connections',
  124. )
  125. except (ImportError, AttributeError):
  126. self._close_old_connections = None
  127. self.database_errors = (
  128. (DatabaseError,) +
  129. _my_database_errors +
  130. _pg_database_errors +
  131. _lite_database_errors +
  132. _oracle_database_errors
  133. )
  134. def django_setup(self):
  135. import django
  136. try:
  137. django_setup = django.setup
  138. except AttributeError: # pragma: no cover
  139. pass
  140. else:
  141. django_setup()
  142. def validate_models(self):
  143. self.django_setup()
  144. try:
  145. from django.core.management.validation import get_validation_errors
  146. except ImportError:
  147. self._validate_models_django17()
  148. else:
  149. s = StringIO()
  150. num_errors = get_validation_errors(s, None)
  151. if num_errors:
  152. raise RuntimeError(
  153. 'One or more Django models did not validate:\n{0}'.format(
  154. s.getvalue()))
  155. def _validate_models_django17(self):
  156. from django.core.management import base
  157. print(base)
  158. cmd = base.BaseCommand()
  159. try:
  160. cmd.stdout = base.OutputWrapper(sys.stdout)
  161. cmd.stderr = base.OutputWrapper(sys.stderr)
  162. except ImportError: # before django 1.5
  163. cmd.stdout, cmd.stderr = sys.stdout, sys.stderr
  164. cmd.check()
  165. def install(self):
  166. signals.beat_embedded_init.connect(self.close_database)
  167. signals.worker_ready.connect(self.on_worker_ready)
  168. signals.task_prerun.connect(self.on_task_prerun)
  169. signals.task_postrun.connect(self.on_task_postrun)
  170. signals.worker_process_init.connect(self.on_worker_process_init)
  171. self.close_database()
  172. self.close_cache()
  173. return self
  174. def on_worker_process_init(self, **kwargs):
  175. # Child process must validate models again if on Windows,
  176. # or if they were started using execv.
  177. if os.environ.get('FORKED_BY_MULTIPROCESSING'):
  178. self.validate_models()
  179. # close connections:
  180. # the parent process may have established these,
  181. # so need to close them.
  182. # calling db.close() on some DB connections will cause
  183. # the inherited DB conn to also get broken in the parent
  184. # process so we need to remove it without triggering any
  185. # network IO that close() might cause.
  186. try:
  187. for c in self._db.connections.all():
  188. if c and c.connection:
  189. _maybe_close_fd(c.connection)
  190. except AttributeError:
  191. if self._db.connection and self._db.connection.connection:
  192. _maybe_close_fd(self._db.connection.connection)
  193. # use the _ version to avoid DB_REUSE preventing the conn.close() call
  194. self._close_database()
  195. self.close_cache()
  196. def on_task_prerun(self, sender, **kwargs):
  197. """Called before every task."""
  198. if not getattr(sender.request, 'is_eager', False):
  199. self.close_database()
  200. def on_task_postrun(self, sender, **kwargs):
  201. # See http://groups.google.com/group/django-users/
  202. # browse_thread/thread/78200863d0c07c6d/
  203. if not getattr(sender.request, 'is_eager', False):
  204. self.close_database()
  205. self.close_cache()
  206. def close_database(self, **kwargs):
  207. if self._close_old_connections:
  208. return self._close_old_connections() # Django 1.6
  209. if not self.db_reuse_max:
  210. return self._close_database()
  211. if self._db_recycles >= self.db_reuse_max * 2:
  212. self._db_recycles = 0
  213. self._close_database()
  214. self._db_recycles += 1
  215. def _close_database(self):
  216. try:
  217. funs = [conn.close for conn in self._db.connections.all()]
  218. except AttributeError:
  219. if hasattr(self._db, 'close_old_connections'): # django 1.6
  220. funs = [self._db.close_old_connections]
  221. else:
  222. # pre multidb, pending deprication in django 1.6
  223. funs = [self._db.close_connection]
  224. for close in funs:
  225. try:
  226. close()
  227. except self.database_errors as exc:
  228. str_exc = str(exc)
  229. if 'closed' not in str_exc and 'not connected' not in str_exc:
  230. raise
  231. def close_cache(self):
  232. try:
  233. self._cache.cache.close()
  234. except (TypeError, AttributeError):
  235. pass
  236. def on_worker_ready(self, **kwargs):
  237. if self._settings.DEBUG:
  238. warnings.warn('Using settings.DEBUG leads to a memory leak, never '
  239. 'use this setting in production environments!')