test_django.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. from __future__ import absolute_import, unicode_literals
  2. from contextlib import contextmanager
  3. import pytest
  4. from case import Mock, mock, patch
  5. from celery.fixups.django import (DjangoFixup, DjangoWorkerFixup,
  6. FixupWarning, _maybe_close_fd, fixup)
  7. class FixupCase:
  8. Fixup = None
  9. @contextmanager
  10. def fixup_context(self, app):
  11. with patch('celery.fixups.django.DjangoWorkerFixup.validate_models'):
  12. with patch('celery.fixups.django.symbol_by_name') as symbyname:
  13. with patch('celery.fixups.django.import_module') as impmod:
  14. f = self.Fixup(app)
  15. yield f, impmod, symbyname
  16. class test_DjangoFixup(FixupCase):
  17. Fixup = DjangoFixup
  18. def test_setting_default_app(self):
  19. from celery import _state
  20. prev, _state.default_app = _state.default_app, None
  21. try:
  22. app = Mock(name='app')
  23. DjangoFixup(app)
  24. app.set_default.assert_called_with()
  25. finally:
  26. _state.default_app = prev
  27. @patch('celery.fixups.django.DjangoWorkerFixup')
  28. def test_worker_fixup_property(self, DjangoWorkerFixup):
  29. f = DjangoFixup(self.app)
  30. f._worker_fixup = None
  31. assert f.worker_fixup is DjangoWorkerFixup()
  32. assert f.worker_fixup is DjangoWorkerFixup()
  33. def test_on_import_modules(self):
  34. f = DjangoFixup(self.app)
  35. f.worker_fixup = Mock(name='worker_fixup')
  36. f.on_import_modules()
  37. f.worker_fixup.validate_models.assert_called_with()
  38. def test_autodiscover_tasks(self, patching):
  39. patching.modules('django.apps')
  40. from django.apps import apps
  41. f = DjangoFixup(self.app)
  42. configs = [Mock(name='c1'), Mock(name='c2')]
  43. apps.get_app_configs.return_value = configs
  44. assert f.autodiscover_tasks() == [c.name for c in configs]
  45. def test_fixup(self, patching):
  46. with patch('celery.fixups.django.DjangoFixup') as Fixup:
  47. patching.setenv('DJANGO_SETTINGS_MODULE', '')
  48. fixup(self.app)
  49. Fixup.assert_not_called()
  50. patching.setenv('DJANGO_SETTINGS_MODULE', 'settings')
  51. with mock.mask_modules('django'):
  52. with pytest.warns(FixupWarning):
  53. fixup(self.app)
  54. Fixup.assert_not_called()
  55. with mock.module_exists('django'):
  56. import django
  57. django.VERSION = (1, 10, 1)
  58. fixup(self.app)
  59. Fixup.assert_called()
  60. def test_maybe_close_fd(self):
  61. with patch('os.close'):
  62. _maybe_close_fd(Mock())
  63. _maybe_close_fd(object())
  64. def test_init(self):
  65. with self.fixup_context(self.app) as (f, importmod, sym):
  66. assert f
  67. def test_install(self, patching):
  68. self.app.loader = Mock()
  69. self.cw = patching('os.getcwd')
  70. self.p = patching('sys.path')
  71. self.sigs = patching('celery.fixups.django.signals')
  72. with self.fixup_context(self.app) as (f, _, _):
  73. self.cw.return_value = '/opt/vandelay'
  74. f.install()
  75. self.sigs.worker_init.connect.assert_called_with(f.on_worker_init)
  76. assert self.app.loader.now == f.now
  77. self.p.append.assert_called_with('/opt/vandelay')
  78. def test_now(self):
  79. with self.fixup_context(self.app) as (f, _, _):
  80. assert f.now(utc=True)
  81. f._now.assert_not_called()
  82. assert f.now(utc=False)
  83. f._now.assert_called()
  84. def test_on_worker_init(self):
  85. with self.fixup_context(self.app) as (f, _, _):
  86. with patch('celery.fixups.django.DjangoWorkerFixup') as DWF:
  87. f.on_worker_init()
  88. DWF.assert_called_with(f.app)
  89. DWF.return_value.install.assert_called_with()
  90. assert f._worker_fixup is DWF.return_value
  91. class test_DjangoWorkerFixup(FixupCase):
  92. Fixup = DjangoWorkerFixup
  93. def test_init(self):
  94. with self.fixup_context(self.app) as (f, importmod, sym):
  95. assert f
  96. def test_install(self):
  97. self.app.conf = {'CELERY_DB_REUSE_MAX': None}
  98. self.app.loader = Mock()
  99. with self.fixup_context(self.app) as (f, _, _):
  100. with patch('celery.fixups.django.signals') as sigs:
  101. f.install()
  102. sigs.beat_embedded_init.connect.assert_called_with(
  103. f.close_database,
  104. )
  105. sigs.worker_ready.connect.assert_called_with(f.on_worker_ready)
  106. sigs.task_prerun.connect.assert_called_with(f.on_task_prerun)
  107. sigs.task_postrun.connect.assert_called_with(f.on_task_postrun)
  108. sigs.worker_process_init.connect.assert_called_with(
  109. f.on_worker_process_init,
  110. )
  111. def test_on_worker_process_init(self, patching):
  112. with self.fixup_context(self.app) as (f, _, _):
  113. with patch('celery.fixups.django._maybe_close_fd') as mcf:
  114. _all = f._db.connections.all = Mock()
  115. conns = _all.return_value = [
  116. Mock(), Mock(),
  117. ]
  118. conns[0].connection = None
  119. with patch.object(f, 'close_cache'):
  120. with patch.object(f, '_close_database'):
  121. f.on_worker_process_init()
  122. mcf.assert_called_with(conns[1].connection)
  123. f.close_cache.assert_called_with()
  124. f._close_database.assert_called_with()
  125. f.validate_models = Mock(name='validate_models')
  126. patching.setenv('FORKED_BY_MULTIPROCESSING', '1')
  127. f.on_worker_process_init()
  128. f.validate_models.assert_called_with()
  129. def test_on_task_prerun(self):
  130. task = Mock()
  131. with self.fixup_context(self.app) as (f, _, _):
  132. task.request.is_eager = False
  133. with patch.object(f, 'close_database'):
  134. f.on_task_prerun(task)
  135. f.close_database.assert_called_with()
  136. task.request.is_eager = True
  137. with patch.object(f, 'close_database'):
  138. f.on_task_prerun(task)
  139. f.close_database.assert_not_called()
  140. def test_on_task_postrun(self):
  141. task = Mock()
  142. with self.fixup_context(self.app) as (f, _, _):
  143. with patch.object(f, 'close_cache'):
  144. task.request.is_eager = False
  145. with patch.object(f, 'close_database'):
  146. f.on_task_postrun(task)
  147. f.close_database.assert_called()
  148. f.close_cache.assert_called()
  149. # when a task is eager, don't close connections
  150. with patch.object(f, 'close_cache'):
  151. task.request.is_eager = True
  152. with patch.object(f, 'close_database'):
  153. f.on_task_postrun(task)
  154. f.close_database.assert_not_called()
  155. f.close_cache.assert_not_called()
  156. def test_close_database(self):
  157. with self.fixup_context(self.app) as (f, _, _):
  158. with patch.object(f, '_close_database') as _close:
  159. f.db_reuse_max = None
  160. f.close_database()
  161. _close.assert_called_with()
  162. _close.reset_mock()
  163. f.db_reuse_max = 10
  164. f._db_recycles = 3
  165. f.close_database()
  166. _close.assert_not_called()
  167. assert f._db_recycles == 4
  168. _close.reset_mock()
  169. f._db_recycles = 20
  170. f.close_database()
  171. _close.assert_called_with()
  172. assert f._db_recycles == 1
  173. def test__close_database(self):
  174. with self.fixup_context(self.app) as (f, _, _):
  175. conns = [Mock(), Mock(), Mock()]
  176. conns[1].close.side_effect = KeyError('already closed')
  177. f.DatabaseError = KeyError
  178. f.interface_errors = ()
  179. f._db.connections = Mock() # ConnectionHandler
  180. f._db.connections.all.side_effect = lambda: conns
  181. f._close_database()
  182. conns[0].close_if_unusable_or_obsolete.assert_called_with()
  183. conns[1].close_if_unusable_or_obsolete.assert_called_with()
  184. conns[2].close_if_unusable_or_obsolete.assert_called_with()
  185. conns[1].close_if_unusable_or_obsolete.side_effect = KeyError(
  186. 'omg')
  187. with pytest.raises(KeyError):
  188. f._close_database()
  189. def test_close_cache(self):
  190. with self.fixup_context(self.app) as (f, _, _):
  191. f.close_cache()
  192. f._cache.close_caches.assert_called_with()
  193. def test_on_worker_ready(self):
  194. with self.fixup_context(self.app) as (f, _, _):
  195. f._settings.DEBUG = False
  196. f.on_worker_ready()
  197. with pytest.warns(UserWarning):
  198. f._settings.DEBUG = True
  199. f.on_worker_ready()
  200. def test_validate_models(self, patching):
  201. with mock.module('django', 'django.db', 'django.core',
  202. 'django.core.cache', 'django.conf',
  203. 'django.db.utils'):
  204. f = self.Fixup(self.app)
  205. f.django_setup = Mock(name='django.setup')
  206. patching.modules('django.core.checks')
  207. from django.core.checks import run_checks
  208. f.validate_models()
  209. f.django_setup.assert_called_with()
  210. run_checks.assert_called_with()
  211. def test_django_setup(self, patching):
  212. patching('celery.fixups.django.symbol_by_name')
  213. patching('celery.fixups.django.import_module')
  214. django, = patching.modules('django')
  215. f = self.Fixup(self.app)
  216. f.django_setup()
  217. django.setup.assert_called_with()