django.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. import warnings
  5. from datetime import datetime
  6. from celery import signals
  7. from celery.exceptions import FixupWarning
  8. SETTINGS_MODULE = os.environ.get('DJANGO_SETTINGS_MODULE')
  9. ERR_NOT_INSTALLED = """\
  10. Environment variable DJANGO_SETTINGS_MODULE is defined
  11. but Django is not installed. Will not apply Django fixups!
  12. """
  13. def _maybe_close_fd(fh):
  14. try:
  15. os.close(fh.fileno())
  16. except (AttributeError, OSError, TypeError):
  17. # TypeError added for celery#962
  18. pass
  19. def fixup(app):
  20. if SETTINGS_MODULE:
  21. try:
  22. import django # noqa
  23. except ImportError:
  24. warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
  25. return DjangoFixup(app).install()
  26. class DjangoFixup(object):
  27. _db_recycles = 0
  28. def __init__(self, app):
  29. from django import db
  30. from django.core import cache
  31. from django.conf import settings
  32. from django.core.mail import mail_admins
  33. # Current time and date
  34. try:
  35. from django.utils.timezone import now
  36. except ImportError: # pre django-1.4
  37. now = datetime.now # noqa
  38. # Database-related exceptions.
  39. from django.db import DatabaseError
  40. try:
  41. import MySQLdb as mysql
  42. _my_database_errors = (mysql.DatabaseError,
  43. mysql.InterfaceError,
  44. mysql.OperationalError)
  45. except ImportError:
  46. _my_database_errors = () # noqa
  47. try:
  48. import psycopg2 as pg
  49. _pg_database_errors = (pg.DatabaseError,
  50. pg.InterfaceError,
  51. pg.OperationalError)
  52. except ImportError:
  53. _pg_database_errors = () # noqa
  54. try:
  55. import sqlite3
  56. _lite_database_errors = (sqlite3.DatabaseError,
  57. sqlite3.InterfaceError,
  58. sqlite3.OperationalError)
  59. except ImportError:
  60. _lite_database_errors = () # noqa
  61. try:
  62. import cx_Oracle as oracle
  63. _oracle_database_errors = (oracle.DatabaseError,
  64. oracle.InterfaceError,
  65. oracle.OperationalError)
  66. except ImportError:
  67. _oracle_database_errors = () # noqa
  68. try:
  69. from django.db import close_old_connections
  70. self._close_old_connections = close_old_connections
  71. except ImportError:
  72. self._close_old_connections = None
  73. self.app = app
  74. self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
  75. self._cache = cache
  76. self._settings = settings
  77. self._db = db
  78. self._mail_admins = mail_admins
  79. self._now = now
  80. self.database_errors = (
  81. (DatabaseError, ) +
  82. _my_database_errors +
  83. _pg_database_errors +
  84. _lite_database_errors +
  85. _oracle_database_errors,
  86. )
  87. def install(self):
  88. # Need to add project directory to path
  89. sys.path.append(os.getcwd())
  90. signals.beat_embedded_init.connect(self.close_database)
  91. signals.worker_ready.connect(self.on_worker_ready)
  92. signals.task_prerun.connect(self.on_task_prerun)
  93. signals.task_postrun.connect(self.on_task_postrun)
  94. signals.worker_init.connect(self.on_worker_init)
  95. signals.worker_process_init.connect(self.on_worker_process_init)
  96. self.app.loader.now = self.now
  97. self.app.loader.mail_admins = self.mail_admins
  98. return self
  99. def now(self, utc=False):
  100. return datetime.utcnow() if utc else self._now()
  101. def mail_admins(self, subject, body, fail_silently=False, **kwargs):
  102. return self._mail_admins(subject, body, fail_silently=fail_silently)
  103. def on_worker_init(self, **kwargs):
  104. """Called when the worker starts.
  105. Automatically discovers any ``tasks.py`` files in the applications
  106. listed in ``INSTALLED_APPS``.
  107. """
  108. self.close_database()
  109. self.close_cache()
  110. def on_worker_process_init(self, **kwargs):
  111. # the parent process may have established these,
  112. # so need to close them.
  113. # calling db.close() on some DB connections will cause
  114. # the inherited DB conn to also get broken in the parent
  115. # process so we need to remove it without triggering any
  116. # network IO that close() might cause.
  117. try:
  118. for c in self._db.connections.all():
  119. if c and c.connection:
  120. _maybe_close_fd(c.connection)
  121. except AttributeError:
  122. if self._db.connection and self._db.connection.connection:
  123. _maybe_close_fd(self._db.connection.connection)
  124. # use the _ version to avoid DB_REUSE preventing the conn.close() call
  125. self._close_database()
  126. self.close_cache()
  127. def on_task_prerun(self, sender, **kwargs):
  128. """Called before every task."""
  129. if not getattr(sender.request, 'is_eager', False):
  130. self.close_database()
  131. def on_task_postrun(self, **kwargs):
  132. """Does everything necessary for Django to work in a long-living,
  133. multiprocessing environment.
  134. """
  135. # See http://groups.google.com/group/django-users/
  136. # browse_thread/thread/78200863d0c07c6d/
  137. self.close_database()
  138. self.close_cache()
  139. def close_database(self, **kwargs):
  140. if self.close_old_connections:
  141. return self.close_old_connections() # Django 1.6
  142. if not self.db_reuse_max:
  143. return self._close_database()
  144. if self._db_recycles >= self.db_reuse_max * 2:
  145. self._db_recycles = 0
  146. self._close_database()
  147. self._db_recycles += 1
  148. def _close_database(self):
  149. try:
  150. funs = [conn.close for conn in self._db.connections]
  151. except AttributeError:
  152. funs = [self._db.close_connection] # pre multidb
  153. for close in funs:
  154. try:
  155. close()
  156. except self.database_errors as exc:
  157. str_exc = str(exc)
  158. if 'closed' not in str_exc and 'not connected' not in str_exc:
  159. raise
  160. def close_cache(self):
  161. try:
  162. self._cache.cache.close()
  163. except (TypeError, AttributeError):
  164. pass
  165. def on_worker_ready(self, **kwargs):
  166. if self._settings.DEBUG:
  167. warnings.warn('Using settings.DEBUG leads to a memory leak, never '
  168. 'use this setting in production environments!')