test_django.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. from __future__ import absolute_import
  2. import os
  3. from contextlib import contextmanager
  4. from celery.fixups.django import (
  5. _maybe_close_fd,
  6. fixup,
  7. DjangoFixup,
  8. DjangoWorkerFixup,
  9. )
  10. from celery.tests.case import (
  11. AppCase, Mock, patch, patch_many, patch_modules, mask_modules,
  12. )
  13. class FixupCase(AppCase):
  14. Fixup = None
  15. @contextmanager
  16. def fixup_context(self, app):
  17. with patch('celery.fixups.django.DjangoWorkerFixup.validate_models'):
  18. with patch('celery.fixups.django.symbol_by_name') as symbyname:
  19. with patch('celery.fixups.django.import_module') as impmod:
  20. f = self.Fixup(app)
  21. yield f, impmod, symbyname
  22. class test_DjangoFixup(FixupCase):
  23. Fixup = DjangoFixup
  24. def test_fixup(self):
  25. with patch('celery.fixups.django.DjangoFixup') as Fixup:
  26. with patch.dict(os.environ, DJANGO_SETTINGS_MODULE=''):
  27. fixup(self.app)
  28. self.assertFalse(Fixup.called)
  29. with patch.dict(os.environ, DJANGO_SETTINGS_MODULE='settings'):
  30. with mask_modules('django'):
  31. with self.assertWarnsRegex(UserWarning, 'but Django is'):
  32. fixup(self.app)
  33. self.assertFalse(Fixup.called)
  34. with patch_modules('django'):
  35. fixup(self.app)
  36. self.assertTrue(Fixup.called)
  37. def test_maybe_close_fd(self):
  38. with patch('os.close'):
  39. _maybe_close_fd(Mock())
  40. _maybe_close_fd(object())
  41. def test_init(self):
  42. with self.fixup_context(self.app) as (f, importmod, sym):
  43. self.assertTrue(f)
  44. def se(name):
  45. if name == 'django.utils.timezone:now':
  46. raise ImportError()
  47. return Mock()
  48. sym.side_effect = se
  49. self.assertTrue(self.Fixup(self.app)._now)
  50. def test_install(self):
  51. self.app.loader = Mock()
  52. with self.fixup_context(self.app) as (f, _, _):
  53. with patch_many('os.getcwd', 'sys.path',
  54. 'celery.fixups.django.signals') as (cw, p, sigs):
  55. cw.return_value = '/opt/vandelay'
  56. f.install()
  57. sigs.worker_init.connect.assert_called_with(f.on_worker_init)
  58. self.assertEqual(self.app.loader.now, f.now)
  59. self.assertEqual(self.app.loader.mail_admins, f.mail_admins)
  60. p.append.assert_called_with('/opt/vandelay')
  61. def test_now(self):
  62. with self.fixup_context(self.app) as (f, _, _):
  63. self.assertTrue(f.now(utc=True))
  64. self.assertFalse(f._now.called)
  65. self.assertTrue(f.now(utc=False))
  66. self.assertTrue(f._now.called)
  67. def test_mail_admins(self):
  68. with self.fixup_context(self.app) as (f, _, _):
  69. f.mail_admins('sub', 'body', True)
  70. f._mail_admins.assert_called_with(
  71. 'sub', 'body', fail_silently=True,
  72. )
  73. def test_on_worker_init(self):
  74. with self.fixup_context(self.app) as (f, _, _):
  75. with patch('celery.fixups.django.DjangoWorkerFixup') as DWF:
  76. f.on_worker_init()
  77. DWF.assert_called_with(f.app)
  78. DWF.return_value.install.assert_called_with()
  79. self.assertIs(f._worker_fixup, DWF.return_value)
  80. class test_DjangoWorkerFixup(FixupCase):
  81. Fixup = DjangoWorkerFixup
  82. def test_init(self):
  83. with self.fixup_context(self.app) as (f, importmod, sym):
  84. self.assertTrue(f)
  85. def se(name):
  86. if name == 'django.db:close_old_connections':
  87. raise ImportError()
  88. return Mock()
  89. sym.side_effect = se
  90. self.assertIsNone(self.Fixup(self.app)._close_old_connections)
  91. def test_install(self):
  92. self.app.conf = {'CELERY_DB_REUSE_MAX': None}
  93. self.app.loader = Mock()
  94. with self.fixup_context(self.app) as (f, _, _):
  95. with patch_many('celery.fixups.django.signals') as (sigs,):
  96. f.install()
  97. sigs.beat_embedded_init.connect.assert_called_with(
  98. f.close_database,
  99. )
  100. sigs.worker_ready.connect.assert_called_with(f.on_worker_ready)
  101. sigs.task_prerun.connect.assert_called_with(f.on_task_prerun)
  102. sigs.task_postrun.connect.assert_called_with(f.on_task_postrun)
  103. sigs.worker_process_init.connect.assert_called_with(
  104. f.on_worker_process_init,
  105. )
  106. def test_on_worker_process_init(self):
  107. with self.fixup_context(self.app) as (f, _, _):
  108. with patch('celery.fixups.django._maybe_close_fd') as mcf:
  109. _all = f._db.connections.all = Mock()
  110. conns = _all.return_value = [
  111. Mock(), Mock(),
  112. ]
  113. conns[0].connection = None
  114. with patch.object(f, 'close_cache'):
  115. with patch.object(f, '_close_database'):
  116. f.on_worker_process_init()
  117. mcf.assert_called_with(conns[1].connection)
  118. f.close_cache.assert_called_with()
  119. f._close_database.assert_called_with()
  120. mcf.reset_mock()
  121. _all.side_effect = AttributeError()
  122. f.on_worker_process_init()
  123. mcf.assert_called_with(f._db.connection.connection)
  124. f._db.connection = None
  125. f.on_worker_process_init()
  126. def test_on_task_prerun(self):
  127. task = Mock()
  128. with self.fixup_context(self.app) as (f, _, _):
  129. task.request.is_eager = False
  130. with patch.object(f, 'close_database'):
  131. f.on_task_prerun(task)
  132. f.close_database.assert_called_with()
  133. task.request.is_eager = True
  134. with patch.object(f, 'close_database'):
  135. f.on_task_prerun(task)
  136. self.assertFalse(f.close_database.called)
  137. def test_on_task_postrun(self):
  138. task = Mock()
  139. with self.fixup_context(self.app) as (f, _, _):
  140. with patch.object(f, 'close_cache'):
  141. task.request.is_eager = False
  142. with patch.object(f, 'close_database'):
  143. f.on_task_postrun(task)
  144. self.assertTrue(f.close_database.called)
  145. self.assertTrue(f.close_cache.called)
  146. # when a task is eager, do not close connections
  147. with patch.object(f, 'close_cache'):
  148. task.request.is_eager = True
  149. with patch.object(f, 'close_database'):
  150. f.on_task_postrun(task)
  151. self.assertFalse(f.close_database.called)
  152. self.assertFalse(f.close_cache.called)
  153. def test_close_database(self):
  154. with self.fixup_context(self.app) as (f, _, _):
  155. f._close_old_connections = Mock()
  156. f.close_database()
  157. f._close_old_connections.assert_called_with()
  158. f._close_old_connections = None
  159. with patch.object(f, '_close_database') as _close:
  160. f.db_reuse_max = None
  161. f.close_database()
  162. _close.assert_called_with()
  163. _close.reset_mock()
  164. f.db_reuse_max = 10
  165. f._db_recycles = 3
  166. f.close_database()
  167. self.assertFalse(_close.called)
  168. self.assertEqual(f._db_recycles, 4)
  169. _close.reset_mock()
  170. f._db_recycles = 20
  171. f.close_database()
  172. _close.assert_called_with()
  173. self.assertEqual(f._db_recycles, 1)
  174. def test__close_database(self):
  175. with self.fixup_context(self.app) as (f, _, _):
  176. conns = [Mock(), Mock(), Mock()]
  177. conns[1].close.side_effect = KeyError('already closed')
  178. f.database_errors = (KeyError,)
  179. f._db.connections = Mock() # ConnectionHandler
  180. f._db.connections.all.side_effect = lambda: conns
  181. f._close_database()
  182. conns[0].close.assert_called_with()
  183. conns[1].close.assert_called_with()
  184. conns[2].close.assert_called_with()
  185. conns[1].close.side_effect = KeyError('omg')
  186. with self.assertRaises(KeyError):
  187. f._close_database()
  188. class Object(object):
  189. pass
  190. o = Object()
  191. o.close_connection = Mock()
  192. f._db = o
  193. f._close_database()
  194. o.close_connection.assert_called_with()
  195. def test_close_cache(self):
  196. with self.fixup_context(self.app) as (f, _, _):
  197. f.close_cache()
  198. f._cache.cache.close.assert_called_with()
  199. f._cache.cache.close.side_effect = TypeError()
  200. f.close_cache()
  201. def test_on_worker_ready(self):
  202. with self.fixup_context(self.app) as (f, _, _):
  203. f._settings.DEBUG = False
  204. f.on_worker_ready()
  205. with self.assertWarnsRegex(UserWarning, r'leads to a memory leak'):
  206. f._settings.DEBUG = True
  207. f.on_worker_ready()
  208. def test_mysql_errors(self):
  209. with patch_modules('MySQLdb'):
  210. import MySQLdb as mod
  211. mod.DatabaseError = Mock()
  212. mod.InterfaceError = Mock()
  213. mod.OperationalError = Mock()
  214. with self.fixup_context(self.app) as (f, _, _):
  215. self.assertIn(mod.DatabaseError, f.database_errors)
  216. self.assertIn(mod.InterfaceError, f.database_errors)
  217. self.assertIn(mod.OperationalError, f.database_errors)
  218. with mask_modules('MySQLdb'):
  219. with self.fixup_context(self.app):
  220. pass
  221. def test_pg_errors(self):
  222. with patch_modules('psycopg2'):
  223. import psycopg2 as mod
  224. mod.DatabaseError = Mock()
  225. mod.InterfaceError = Mock()
  226. mod.OperationalError = Mock()
  227. with self.fixup_context(self.app) as (f, _, _):
  228. self.assertIn(mod.DatabaseError, f.database_errors)
  229. self.assertIn(mod.InterfaceError, f.database_errors)
  230. self.assertIn(mod.OperationalError, f.database_errors)
  231. with mask_modules('psycopg2'):
  232. with self.fixup_context(self.app):
  233. pass
  234. def test_sqlite_errors(self):
  235. with patch_modules('sqlite3'):
  236. import sqlite3 as mod
  237. mod.DatabaseError = Mock()
  238. mod.InterfaceError = Mock()
  239. mod.OperationalError = Mock()
  240. with self.fixup_context(self.app) as (f, _, _):
  241. self.assertIn(mod.DatabaseError, f.database_errors)
  242. self.assertIn(mod.InterfaceError, f.database_errors)
  243. self.assertIn(mod.OperationalError, f.database_errors)
  244. with mask_modules('sqlite3'):
  245. with self.fixup_context(self.app):
  246. pass
  247. def test_oracle_errors(self):
  248. with patch_modules('cx_Oracle'):
  249. import cx_Oracle as mod
  250. mod.DatabaseError = Mock()
  251. mod.InterfaceError = Mock()
  252. mod.OperationalError = Mock()
  253. with self.fixup_context(self.app) as (f, _, _):
  254. self.assertIn(mod.DatabaseError, f.database_errors)
  255. self.assertIn(mod.InterfaceError, f.database_errors)
  256. self.assertIn(mod.OperationalError, f.database_errors)
  257. with mask_modules('cx_Oracle'):
  258. with self.fixup_context(self.app):
  259. pass