test_loaders.py 8.2 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. import sys
  4. import warnings
  5. import pytest
  6. from case import Mock, mock, patch
  7. from celery import loaders
  8. from celery.exceptions import NotConfigured
  9. from celery.five import bytes_if_py2
  10. from celery.loaders import base, default
  11. from celery.loaders.app import AppLoader
  12. from celery.utils.imports import NotAPackage
  13. class DummyLoader(base.BaseLoader):
  14. def read_configuration(self):
  15. return {'foo': 'bar', 'imports': ('os', 'sys')}
  16. class test_loaders:
  17. def test_get_loader_cls(self):
  18. assert loaders.get_loader_cls('default') is default.Loader
  19. class test_LoaderBase:
  20. message_options = {'subject': 'Subject',
  21. 'body': 'Body',
  22. 'sender': 'x@x.com',
  23. 'to': 'y@x.com'}
  24. server_options = {'host': 'smtp.x.com',
  25. 'port': 1234,
  26. 'user': 'x',
  27. 'password': 'qwerty',
  28. 'timeout': 3}
  29. def setup(self):
  30. self.loader = DummyLoader(app=self.app)
  31. def test_handlers_pass(self):
  32. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  33. self.loader.on_worker_init()
  34. def test_now(self):
  35. assert self.loader.now(utc=True)
  36. assert self.loader.now(utc=False)
  37. def test_read_configuration_no_env(self):
  38. assert base.BaseLoader(app=self.app).read_configuration(
  39. 'FOO_X_S_WE_WQ_Q_WE') is None
  40. def test_autodiscovery(self):
  41. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  42. auto.return_value = [Mock()]
  43. auto.return_value[0].__name__ = 'moo'
  44. self.loader.autodiscover_tasks(['A', 'B'])
  45. assert 'moo' in self.loader.task_modules
  46. self.loader.task_modules.discard('moo')
  47. def test_import_task_module(self):
  48. assert sys == self.loader.import_task_module('sys')
  49. def test_init_worker_process(self):
  50. self.loader.on_worker_process_init()
  51. m = self.loader.on_worker_process_init = Mock()
  52. self.loader.init_worker_process()
  53. m.assert_called_with()
  54. def test_config_from_object_module(self):
  55. self.loader.import_from_cwd = Mock()
  56. self.loader.config_from_object('module_name')
  57. self.loader.import_from_cwd.assert_called_with('module_name')
  58. def test_conf_property(self):
  59. assert self.loader.conf['foo'] == 'bar'
  60. assert self.loader._conf['foo'] == 'bar'
  61. assert self.loader.conf['foo'] == 'bar'
  62. def test_import_default_modules(self):
  63. def modnames(l):
  64. return [m.__name__ for m in l]
  65. self.app.conf.imports = ('os', 'sys')
  66. assert (sorted(modnames(self.loader.import_default_modules())) ==
  67. sorted(modnames([os, sys])))
  68. def test_import_default_modules_with_exception(self):
  69. """ Make sure exceptions are not silenced since this step is prior to
  70. setup logging. """
  71. def trigger_exception(**kwargs):
  72. raise ImportError('Dummy ImportError')
  73. from celery.signals import import_modules
  74. import_modules.connect(trigger_exception)
  75. self.app.conf.imports = ('os', 'sys')
  76. with pytest.raises(ImportError):
  77. self.loader.import_default_modules()
  78. def test_import_from_cwd_custom_imp(self):
  79. imp = Mock(name='imp')
  80. self.loader.import_from_cwd('foo', imp=imp)
  81. imp.assert_called()
  82. def test_cmdline_config_ValueError(self):
  83. with pytest.raises(ValueError):
  84. self.loader.cmdline_config_parser(['broker.port=foobar'])
  85. class test_DefaultLoader:
  86. @patch('celery.loaders.base.find_module')
  87. def test_read_configuration_not_a_package(self, find_module):
  88. find_module.side_effect = NotAPackage()
  89. l = default.Loader(app=self.app)
  90. with pytest.raises(NotAPackage):
  91. l.read_configuration(fail_silently=False)
  92. @patch('celery.loaders.base.find_module')
  93. @mock.environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  94. def test_read_configuration_py_in_name(self, find_module):
  95. find_module.side_effect = NotAPackage()
  96. l = default.Loader(app=self.app)
  97. with pytest.raises(NotAPackage):
  98. l.read_configuration(fail_silently=False)
  99. @patch('celery.loaders.base.find_module')
  100. def test_read_configuration_importerror(self, find_module):
  101. default.C_WNOCONF = True
  102. find_module.side_effect = ImportError()
  103. l = default.Loader(app=self.app)
  104. with pytest.warns(NotConfigured):
  105. l.read_configuration(fail_silently=True)
  106. default.C_WNOCONF = False
  107. l.read_configuration(fail_silently=True)
  108. def test_read_configuration(self):
  109. from types import ModuleType
  110. class ConfigModule(ModuleType):
  111. pass
  112. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  113. celeryconfig = ConfigModule(bytes_if_py2(configname))
  114. celeryconfig.imports = ('os', 'sys')
  115. prevconfig = sys.modules.get(configname)
  116. sys.modules[configname] = celeryconfig
  117. try:
  118. l = default.Loader(app=self.app)
  119. l.find_module = Mock(name='find_module')
  120. settings = l.read_configuration(fail_silently=False)
  121. assert settings.imports == ('os', 'sys')
  122. settings = l.read_configuration(fail_silently=False)
  123. assert settings.imports == ('os', 'sys')
  124. l.on_worker_init()
  125. finally:
  126. if prevconfig:
  127. sys.modules[configname] = prevconfig
  128. def test_read_configuration_ImportError(self):
  129. sentinel = object()
  130. prev, os.environ['CELERY_CONFIG_MODULE'] = (
  131. os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',
  132. )
  133. try:
  134. l = default.Loader(app=self.app)
  135. with pytest.raises(ImportError):
  136. l.read_configuration(fail_silently=False)
  137. l.read_configuration(fail_silently=True)
  138. finally:
  139. if prev is not sentinel:
  140. os.environ['CELERY_CONFIG_MODULE'] = prev
  141. else:
  142. os.environ.pop('CELERY_CONFIG_MODULE', None)
  143. def test_import_from_cwd(self):
  144. l = default.Loader(app=self.app)
  145. old_path = list(sys.path)
  146. try:
  147. sys.path.remove(os.getcwd())
  148. except ValueError:
  149. pass
  150. celery = sys.modules.pop('celery', None)
  151. sys.modules.pop('celery.local', None)
  152. try:
  153. assert l.import_from_cwd('celery')
  154. sys.modules.pop('celery', None)
  155. sys.modules.pop('celery.local', None)
  156. sys.path.insert(0, os.getcwd())
  157. assert l.import_from_cwd('celery')
  158. finally:
  159. sys.path = old_path
  160. sys.modules['celery'] = celery
  161. def test_unconfigured_settings(self):
  162. context_executed = [False]
  163. class _Loader(default.Loader):
  164. def find_module(self, name):
  165. raise ImportError(name)
  166. with warnings.catch_warnings(record=True):
  167. l = _Loader(app=self.app)
  168. assert not l.configured
  169. context_executed[0] = True
  170. assert context_executed[0]
  171. class test_AppLoader:
  172. def setup(self):
  173. self.loader = AppLoader(app=self.app)
  174. def test_on_worker_init(self):
  175. self.app.conf.imports = ('subprocess',)
  176. sys.modules.pop('subprocess', None)
  177. self.loader.init_worker()
  178. assert 'subprocess' in sys.modules
  179. class test_autodiscovery:
  180. def test_autodiscover_tasks(self):
  181. base._RACE_PROTECTION = True
  182. try:
  183. base.autodiscover_tasks(['foo'])
  184. finally:
  185. base._RACE_PROTECTION = False
  186. with patch('celery.loaders.base.find_related_module') as frm:
  187. base.autodiscover_tasks(['foo'])
  188. frm.assert_called()
  189. def test_find_related_module(self):
  190. with patch('importlib.import_module') as imp:
  191. with patch('imp.find_module') as find:
  192. imp.return_value = Mock()
  193. imp.return_value.__path__ = 'foo'
  194. base.find_related_module(base, 'tasks')
  195. find.side_effect = ImportError()
  196. base.find_related_module(base, 'tasks')