test_django.py 9.7 KB


  1. import pytest
  2. from contextlib import contextmanager
  3. from case import Mock, mock, patch
  4. from celery.fixups.django import (
  5. _maybe_close_fd,
  6. fixup,
  7. FixupWarning,
  8. DjangoFixup,
  9. DjangoWorkerFixup,
  10. )
  11. class FixupCase:
  12. Fixup = None
  13. @contextmanager
  14. def fixup_context(self, app):
  15. with patch('celery.fixups.django.DjangoWorkerFixup.validate_models'):
  16. with patch('celery.fixups.django.symbol_by_name') as symbyname:
  17. with patch('celery.fixups.django.import_module') as impmod:
  18. f = self.Fixup(app)
  19. yield f, impmod, symbyname
  20. class test_DjangoFixup(FixupCase):
  21. Fixup = DjangoFixup
  22. def test_setting_default_app(self):
  23. from celery import _state
  24. prev, _state.default_app = _state.default_app, None
  25. try:
  26. app = Mock(name='app')
  27. DjangoFixup(app)
  28. app.set_default.assert_called_with()
  29. finally:
  30. _state.default_app = prev
  31. @patch('celery.fixups.django.DjangoWorkerFixup')
  32. def test_worker_fixup_property(self, DjangoWorkerFixup):
  33. f = DjangoFixup(self.app)
  34. f._worker_fixup = None
  35. assert f.worker_fixup is DjangoWorkerFixup()
  36. assert f.worker_fixup is DjangoWorkerFixup()
  37. def test_on_import_modules(self):
  38. f = DjangoFixup(self.app)
  39. f.worker_fixup = Mock(name='worker_fixup')
  40. f.on_import_modules()
  41. f.worker_fixup.validate_models.assert_called_with()
  42. def test_autodiscover_tasks(self, patching):
  43. patching.modules('django.apps')
  44. from django.apps import apps
  45. f = DjangoFixup(self.app)
  46. configs = [Mock(name='c1'), Mock(name='c2')]
  47. apps.get_app_configs.return_value = configs
  48. assert f.autodiscover_tasks() == [c.name for c in configs]
  49. def test_fixup(self, patching):
  50. with patch('celery.fixups.django.DjangoFixup') as Fixup:
  51. patching.setenv('DJANGO_SETTINGS_MODULE', '')
  52. fixup(self.app)
  53. Fixup.assert_not_called()
  54. patching.setenv('DJANGO_SETTINGS_MODULE', 'settings')
  55. with mock.mask_modules('django'):
  56. with pytest.warns(FixupWarning):
  57. fixup(self.app)
  58. Fixup.assert_not_called()
  59. with mock.module_exists('django'):
  60. import django
  61. django.VERSION = (1, 10, 1)
  62. fixup(self.app)
  63. Fixup.assert_called()
  64. def test_maybe_close_fd(self):
  65. with patch('os.close'):
  66. _maybe_close_fd(Mock())
  67. _maybe_close_fd(object())
  68. def test_init(self):
  69. with self.fixup_context(self.app) as (f, importmod, sym):
  70. assert f
  71. def test_install(self, patching):
  72. self.app.loader = Mock()
  73. self.cw = patching('os.getcwd')
  74. self.p = patching('sys.path')
  75. self.sigs = patching('celery.fixups.django.signals')
  76. with self.fixup_context(self.app) as (f, _, _):
  77. self.cw.return_value = '/opt/vandelay'
  78. f.install()
  79. self.sigs.worker_init.connect.assert_called_with(f.on_worker_init)
  80. assert self.app.loader.now == f.now
  81. self.p.append.assert_called_with('/opt/vandelay')
  82. def test_now(self):
  83. with self.fixup_context(self.app) as (f, _, _):
  84. assert f.now(utc=True)
  85. f._now.assert_not_called()
  86. assert f.now(utc=False)
  87. f._now.assert_called()
  88. def test_on_worker_init(self):
  89. with self.fixup_context(self.app) as (f, _, _):
  90. with patch('celery.fixups.django.DjangoWorkerFixup') as DWF:
  91. f.on_worker_init()
  92. DWF.assert_called_with(f.app)
  93. DWF.return_value.install.assert_called_with()
  94. assert f._worker_fixup is DWF.return_value
  95. class test_DjangoWorkerFixup(FixupCase):
  96. Fixup = DjangoWorkerFixup
  97. def test_init(self):
  98. with self.fixup_context(self.app) as (f, importmod, sym):
  99. assert f
  100. def test_install(self):
  101. self.app.conf = {'CELERY_DB_REUSE_MAX': None}
  102. self.app.loader = Mock()
  103. with self.fixup_context(self.app) as (f, _, _):
  104. with patch('celery.fixups.django.signals') as sigs:
  105. f.install()
  106. sigs.beat_embedded_init.connect.assert_called_with(
  107. f.close_database,
  108. )
  109. sigs.worker_ready.connect.assert_called_with(f.on_worker_ready)
  110. sigs.task_prerun.connect.assert_called_with(f.on_task_prerun)
  111. sigs.task_postrun.connect.assert_called_with(f.on_task_postrun)
  112. sigs.worker_process_init.connect.assert_called_with(
  113. f.on_worker_process_init,
  114. )
  115. def test_on_worker_process_init(self, patching):
  116. with self.fixup_context(self.app) as (f, _, _):
  117. with patch('celery.fixups.django._maybe_close_fd') as mcf:
  118. _all = f._db.connections.all = Mock()
  119. conns = _all.return_value = [
  120. Mock(), Mock(),
  121. ]
  122. conns[0].connection = None
  123. with patch.object(f, 'close_cache'):
  124. with patch.object(f, '_close_database'):
  125. f.on_worker_process_init()
  126. mcf.assert_called_with(conns[1].connection)
  127. f.close_cache.assert_called_with()
  128. f._close_database.assert_called_with()
  129. f.validate_models = Mock(name='validate_models')
  130. patching.setenv('FORKED_BY_MULTIPROCESSING', '1')
  131. f.on_worker_process_init()
  132. f.validate_models.assert_called_with()
  133. def test_on_task_prerun(self):
  134. task = Mock()
  135. with self.fixup_context(self.app) as (f, _, _):
  136. task.request.is_eager = False
  137. with patch.object(f, 'close_database'):
  138. f.on_task_prerun(task)
  139. f.close_database.assert_called_with()
  140. task.request.is_eager = True
  141. with patch.object(f, 'close_database'):
  142. f.on_task_prerun(task)
  143. f.close_database.assert_not_called()
  144. def test_on_task_postrun(self):
  145. task = Mock()
  146. with self.fixup_context(self.app) as (f, _, _):
  147. with patch.object(f, 'close_cache'):
  148. task.request.is_eager = False
  149. with patch.object(f, 'close_database'):
  150. f.on_task_postrun(task)
  151. f.close_database.assert_called()
  152. f.close_cache.assert_called()
  153. # when a task is eager, don't close connections
  154. with patch.object(f, 'close_cache'):
  155. task.request.is_eager = True
  156. with patch.object(f, 'close_database'):
  157. f.on_task_postrun(task)
  158. f.close_database.assert_not_called()
  159. f.close_cache.assert_not_called()
  160. def test_close_database(self):
  161. with self.fixup_context(self.app) as (f, _, _):
  162. with patch.object(f, '_close_database') as _close:
  163. f.db_reuse_max = None
  164. f.close_database()
  165. _close.assert_called_with()
  166. _close.reset_mock()
  167. f.db_reuse_max = 10
  168. f._db_recycles = 3
  169. f.close_database()
  170. _close.assert_not_called()
  171. assert f._db_recycles == 4
  172. _close.reset_mock()
  173. f._db_recycles = 20
  174. f.close_database()
  175. _close.assert_called_with()
  176. assert f._db_recycles == 1
  177. def test__close_database(self):
  178. with self.fixup_context(self.app) as (f, _, _):
  179. conns = [Mock(), Mock(), Mock()]
  180. conns[1].close.side_effect = KeyError('already closed')
  181. f.DatabaseError = KeyError
  182. f.interface_errors = ()
  183. f._db.connections = Mock() # ConnectionHandler
  184. f._db.connections.all.side_effect = lambda: conns
  185. f._close_database()
  186. conns[0].close.assert_called_with()
  187. conns[1].close.assert_called_with()
  188. conns[2].close.assert_called_with()
  189. conns[1].close.side_effect = KeyError('omg')
  190. with pytest.raises(KeyError):
  191. f._close_database()
  192. def test_close_cache(self):
  193. with self.fixup_context(self.app) as (f, _, _):
  194. f.close_cache()
  195. f._cache.cache.close.assert_called_with()
  196. f._cache.cache.close.side_effect = TypeError()
  197. f.close_cache()
  198. def test_on_worker_ready(self):
  199. with self.fixup_context(self.app) as (f, _, _):
  200. f._settings.DEBUG = False
  201. f.on_worker_ready()
  202. with pytest.warns(UserWarning):
  203. f._settings.DEBUG = True
  204. f.on_worker_ready()
  205. def test_validate_models(self, patching):
  206. with mock.module('django', 'django.db', 'django.core',
  207. 'django.core.cache', 'django.conf',
  208. 'django.db.utils'):
  209. f = self.Fixup(self.app)
  210. f.django_setup = Mock(name='django.setup')
  211. patching.modules('django.core.checks')
  212. from django.core.checks import run_checks
  213. f.validate_models()
  214. f.django_setup.assert_called_with()
  215. run_checks.assert_called_with()
  216. def test_django_setup(self, patching):
  217. patching('celery.fixups.django.symbol_by_name')
  218. patching('celery.fixups.django.import_module')
  219. django, = patching.modules('django')
  220. f = self.Fixup(self.app)
  221. f.django_setup()
  222. django.setup.assert_called_with()