test_loaders.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. import pytest
  4. import sys
  5. import warnings
  6. from case import Mock, mock, patch
  7. from celery import loaders
  8. from celery.exceptions import NotConfigured
  9. from celery.five import bytes_if_py2
  10. from celery.loaders import base
  11. from celery.loaders import default
  12. from celery.loaders.app import AppLoader
  13. from celery.utils.imports import NotAPackage
  14. class DummyLoader(base.BaseLoader):
  15. def read_configuration(self):
  16. return {'foo': 'bar', 'imports': ('os', 'sys')}
  17. class test_loaders:
  18. def test_get_loader_cls(self):
  19. assert loaders.get_loader_cls('default') is default.Loader
  20. class test_LoaderBase:
  21. message_options = {'subject': 'Subject',
  22. 'body': 'Body',
  23. 'sender': 'x@x.com',
  24. 'to': 'y@x.com'}
  25. server_options = {'host': 'smtp.x.com',
  26. 'port': 1234,
  27. 'user': 'x',
  28. 'password': 'qwerty',
  29. 'timeout': 3}
  30. def setup(self):
  31. self.loader = DummyLoader(app=self.app)
  32. def test_handlers_pass(self):
  33. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  34. self.loader.on_worker_init()
  35. def test_now(self):
  36. assert self.loader.now(utc=True)
  37. assert self.loader.now(utc=False)
  38. def test_read_configuration_no_env(self):
  39. assert base.BaseLoader(app=self.app).read_configuration(
  40. 'FOO_X_S_WE_WQ_Q_WE') is None
  41. def test_autodiscovery(self):
  42. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  43. auto.return_value = [Mock()]
  44. auto.return_value[0].__name__ = 'moo'
  45. self.loader.autodiscover_tasks(['A', 'B'])
  46. assert 'moo' in self.loader.task_modules
  47. self.loader.task_modules.discard('moo')
  48. def test_import_task_module(self):
  49. assert sys == self.loader.import_task_module('sys')
  50. def test_init_worker_process(self):
  51. self.loader.on_worker_process_init()
  52. m = self.loader.on_worker_process_init = Mock()
  53. self.loader.init_worker_process()
  54. m.assert_called_with()
  55. def test_config_from_object_module(self):
  56. self.loader.import_from_cwd = Mock()
  57. self.loader.config_from_object('module_name')
  58. self.loader.import_from_cwd.assert_called_with('module_name')
  59. def test_conf_property(self):
  60. assert self.loader.conf['foo'] == 'bar'
  61. assert self.loader._conf['foo'] == 'bar'
  62. assert self.loader.conf['foo'] == 'bar'
  63. def test_import_default_modules(self):
  64. def modnames(l):
  65. return [m.__name__ for m in l]
  66. self.app.conf.imports = ('os', 'sys')
  67. assert (sorted(modnames(self.loader.import_default_modules())) ==
  68. sorted(modnames([os, sys])))
  69. def test_import_from_cwd_custom_imp(self):
  70. imp = Mock(name='imp')
  71. self.loader.import_from_cwd('foo', imp=imp)
  72. imp.assert_called()
  73. def test_cmdline_config_ValueError(self):
  74. with pytest.raises(ValueError):
  75. self.loader.cmdline_config_parser(['broker.port=foobar'])
  76. class test_DefaultLoader:
  77. @patch('celery.loaders.base.find_module')
  78. def test_read_configuration_not_a_package(self, find_module):
  79. find_module.side_effect = NotAPackage()
  80. l = default.Loader(app=self.app)
  81. with pytest.raises(NotAPackage):
  82. l.read_configuration(fail_silently=False)
  83. @patch('celery.loaders.base.find_module')
  84. @mock.environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  85. def test_read_configuration_py_in_name(self, find_module):
  86. find_module.side_effect = NotAPackage()
  87. l = default.Loader(app=self.app)
  88. with pytest.raises(NotAPackage):
  89. l.read_configuration(fail_silently=False)
  90. @patch('celery.loaders.base.find_module')
  91. def test_read_configuration_importerror(self, find_module):
  92. default.C_WNOCONF = True
  93. find_module.side_effect = ImportError()
  94. l = default.Loader(app=self.app)
  95. with pytest.warns(NotConfigured):
  96. l.read_configuration(fail_silently=True)
  97. default.C_WNOCONF = False
  98. l.read_configuration(fail_silently=True)
  99. def test_read_configuration(self):
  100. from types import ModuleType
  101. class ConfigModule(ModuleType):
  102. pass
  103. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  104. celeryconfig = ConfigModule(bytes_if_py2(configname))
  105. celeryconfig.imports = ('os', 'sys')
  106. prevconfig = sys.modules.get(configname)
  107. sys.modules[configname] = celeryconfig
  108. try:
  109. l = default.Loader(app=self.app)
  110. l.find_module = Mock(name='find_module')
  111. settings = l.read_configuration(fail_silently=False)
  112. assert settings.imports == ('os', 'sys')
  113. settings = l.read_configuration(fail_silently=False)
  114. assert settings.imports == ('os', 'sys')
  115. l.on_worker_init()
  116. finally:
  117. if prevconfig:
  118. sys.modules[configname] = prevconfig
  119. def test_read_configuration_ImportError(self):
  120. sentinel = object()
  121. prev, os.environ['CELERY_CONFIG_MODULE'] = (
  122. os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',
  123. )
  124. try:
  125. l = default.Loader(app=self.app)
  126. with pytest.raises(ImportError):
  127. l.read_configuration(fail_silently=False)
  128. l.read_configuration(fail_silently=True)
  129. finally:
  130. if prev is not sentinel:
  131. os.environ['CELERY_CONFIG_MODULE'] = prev
  132. else:
  133. os.environ.pop('CELERY_CONFIG_MODULE', None)
  134. def test_import_from_cwd(self):
  135. l = default.Loader(app=self.app)
  136. old_path = list(sys.path)
  137. try:
  138. sys.path.remove(os.getcwd())
  139. except ValueError:
  140. pass
  141. celery = sys.modules.pop('celery', None)
  142. sys.modules.pop('celery.local', None)
  143. try:
  144. assert l.import_from_cwd('celery')
  145. sys.modules.pop('celery', None)
  146. sys.modules.pop('celery.local', None)
  147. sys.path.insert(0, os.getcwd())
  148. assert l.import_from_cwd('celery')
  149. finally:
  150. sys.path = old_path
  151. sys.modules['celery'] = celery
  152. def test_unconfigured_settings(self):
  153. context_executed = [False]
  154. class _Loader(default.Loader):
  155. def find_module(self, name):
  156. raise ImportError(name)
  157. with warnings.catch_warnings(record=True):
  158. l = _Loader(app=self.app)
  159. assert not l.configured
  160. context_executed[0] = True
  161. assert context_executed[0]
  162. class test_AppLoader:
  163. def setup(self):
  164. self.loader = AppLoader(app=self.app)
  165. def test_on_worker_init(self):
  166. self.app.conf.imports = ('subprocess',)
  167. sys.modules.pop('subprocess', None)
  168. self.loader.init_worker()
  169. assert 'subprocess' in sys.modules
  170. class test_autodiscovery:
  171. def test_autodiscover_tasks(self):
  172. base._RACE_PROTECTION = True
  173. try:
  174. base.autodiscover_tasks(['foo'])
  175. finally:
  176. base._RACE_PROTECTION = False
  177. with patch('celery.loaders.base.find_related_module') as frm:
  178. base.autodiscover_tasks(['foo'])
  179. frm.assert_called()
  180. def test_find_related_module(self):
  181. with patch('importlib.import_module') as imp:
  182. with patch('imp.find_module') as find:
  183. imp.return_value = Mock()
  184. imp.return_value.__path__ = 'foo'
  185. base.find_related_module(base, 'tasks')
  186. def se1(val):
  187. imp.side_effect = AttributeError()
  188. imp.side_effect = se1
  189. base.find_related_module(base, 'tasks')
  190. imp.side_effect = None
  191. find.side_effect = ImportError()
  192. base.find_related_module(base, 'tasks')