django.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """Django-specific customization."""
  2. from __future__ import absolute_import, unicode_literals
  3. import os
  4. import sys
  5. import warnings
  6. from kombu.utils.imports import symbol_by_name
  7. from kombu.utils.objects import cached_property
  8. from datetime import datetime
  9. from importlib import import_module
  10. from celery import _state
  11. from celery import signals
  12. from celery.exceptions import FixupWarning, ImproperlyConfigured
  13. __all__ = ['DjangoFixup', 'fixup']
  14. ERR_NOT_INSTALLED = """\
  15. Environment variable DJANGO_SETTINGS_MODULE is defined
  16. but Django isn't installed. Won't apply Django fix-ups!
  17. """
  18. def _maybe_close_fd(fh):
  19. try:
  20. os.close(fh.fileno())
  21. except (AttributeError, OSError, TypeError):
  22. # TypeError added for celery#962
  23. pass
  24. def _verify_django_version(django):
  25. if django.VERSION < (1, 8):
  26. raise ImproperlyConfigured('Celery 4.x requires Django 1.8 or later.')
  27. def fixup(app, env='DJANGO_SETTINGS_MODULE'):
  28. """Install Django fixup if settings module environment is set."""
  29. SETTINGS_MODULE = os.environ.get(env)
  30. if SETTINGS_MODULE and 'django' not in app.loader_cls.lower():
  31. try:
  32. import django # noqa
  33. except ImportError:
  34. warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
  35. else:
  36. _verify_django_version(django)
  37. return DjangoFixup(app).install()
  38. class DjangoFixup(object):
  39. """Fixup installed when using Django."""
  40. def __init__(self, app):
  41. self.app = app
  42. if _state.default_app is None:
  43. self.app.set_default()
  44. self._worker_fixup = None
  45. def install(self):
  46. # Need to add project directory to path
  47. sys.path.append(os.getcwd())
  48. self._settings = symbol_by_name('django.conf:settings')
  49. self.app.loader.now = self.now
  50. signals.import_modules.connect(self.on_import_modules)
  51. signals.worker_init.connect(self.on_worker_init)
  52. return self
  53. @property
  54. def worker_fixup(self):
  55. if self._worker_fixup is None:
  56. self._worker_fixup = DjangoWorkerFixup(self.app)
  57. return self._worker_fixup
  58. @worker_fixup.setter
  59. def worker_fixup(self, value):
  60. self._worker_fixup = value
  61. def on_import_modules(self, **kwargs):
  62. # call django.setup() before task modules are imported
  63. self.worker_fixup.validate_models()
  64. def on_worker_init(self, **kwargs):
  65. self.worker_fixup.install()
  66. def now(self, utc=False):
  67. return datetime.utcnow() if utc else self._now()
  68. def autodiscover_tasks(self):
  69. from django.apps import apps
  70. return [config.name for config in apps.get_app_configs()]
  71. @cached_property
  72. def _now(self):
  73. return symbol_by_name('django.utils.timezone:now')
  74. class DjangoWorkerFixup(object):
  75. _db_recycles = 0
  76. def __init__(self, app):
  77. self.app = app
  78. self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
  79. self._db = import_module('django.db')
  80. self._cache = import_module('django.core.cache')
  81. self._settings = symbol_by_name('django.conf:settings')
  82. self.interface_errors = (
  83. symbol_by_name('django.db.utils.InterfaceError'),
  84. )
  85. self.DatabaseError = symbol_by_name('django.db:DatabaseError')
  86. def django_setup(self):
  87. import django
  88. django.setup()
  89. def validate_models(self):
  90. from django.core.checks import run_checks
  91. self.django_setup()
  92. run_checks()
  93. def install(self):
  94. signals.beat_embedded_init.connect(self.close_database)
  95. signals.worker_ready.connect(self.on_worker_ready)
  96. signals.task_prerun.connect(self.on_task_prerun)
  97. signals.task_postrun.connect(self.on_task_postrun)
  98. signals.worker_process_init.connect(self.on_worker_process_init)
  99. self.close_database()
  100. self.close_cache()
  101. return self
  102. def on_worker_process_init(self, **kwargs):
  103. # Child process must validate models again if on Windows,
  104. # or if they were started using execv.
  105. if os.environ.get('FORKED_BY_MULTIPROCESSING'):
  106. self.validate_models()
  107. # close connections:
  108. # the parent process may have established these,
  109. # so need to close them.
  110. # calling db.close() on some DB connections will cause
  111. # the inherited DB conn to also get broken in the parent
  112. # process so we need to remove it without triggering any
  113. # network IO that close() might cause.
  114. for c in self._db.connections.all():
  115. if c and c.connection:
  116. self._maybe_close_db_fd(c.connection)
  117. # use the _ version to avoid DB_REUSE preventing the conn.close() call
  118. self._close_database()
  119. self.close_cache()
  120. def _maybe_close_db_fd(self, fd):
  121. try:
  122. _maybe_close_fd(fd)
  123. except self.interface_errors:
  124. pass
  125. def on_task_prerun(self, sender, **kwargs):
  126. """Called before every task."""
  127. if not getattr(sender.request, 'is_eager', False):
  128. self.close_database()
  129. def on_task_postrun(self, sender, **kwargs):
  130. # See http://groups.google.com/group/django-users/
  131. # browse_thread/thread/78200863d0c07c6d/
  132. if not getattr(sender.request, 'is_eager', False):
  133. self.close_database()
  134. self.close_cache()
  135. def close_database(self, **kwargs):
  136. if not self.db_reuse_max:
  137. return self._close_database()
  138. if self._db_recycles >= self.db_reuse_max * 2:
  139. self._db_recycles = 0
  140. self._close_database()
  141. self._db_recycles += 1
  142. def _close_database(self):
  143. for conn in self._db.connections.all():
  144. try:
  145. conn.close()
  146. except self.interface_errors:
  147. pass
  148. except self.DatabaseError as exc:
  149. str_exc = str(exc)
  150. if 'closed' not in str_exc and 'not connected' not in str_exc:
  151. raise
  152. def close_cache(self):
  153. try:
  154. self._cache.cache.close()
  155. except (TypeError, AttributeError):
  156. pass
  157. def on_worker_ready(self, **kwargs):
  158. if self._settings.DEBUG:
  159. warnings.warn('Using settings.DEBUG leads to a memory leak, never '
  160. 'use this setting in production environments!')