test_loaders.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import os
  2. import pytest
  3. import sys
  4. import warnings
  5. from case import Mock, mock, patch
  6. from celery import loaders
  7. from celery.exceptions import NotConfigured
  8. from celery.loaders import base
  9. from celery.loaders import default
  10. from celery.loaders.app import AppLoader
  11. from celery.utils.imports import NotAPackage
  12. class DummyLoader(base.BaseLoader):
  13. def read_configuration(self):
  14. return {'foo': 'bar', 'imports': ('os', 'sys')}
  15. class test_loaders:
  16. def test_get_loader_cls(self):
  17. assert loaders.get_loader_cls('default') is default.Loader
  18. class test_LoaderBase:
  19. message_options = {'subject': 'Subject',
  20. 'body': 'Body',
  21. 'sender': 'x@x.com',
  22. 'to': 'y@x.com'}
  23. server_options = {'host': 'smtp.x.com',
  24. 'port': 1234,
  25. 'user': 'x',
  26. 'password': 'qwerty',
  27. 'timeout': 3}
  28. def setup(self):
  29. self.loader = DummyLoader(app=self.app)
  30. def test_handlers_pass(self):
  31. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  32. self.loader.on_worker_init()
  33. def test_now(self):
  34. assert self.loader.now(utc=True)
  35. assert self.loader.now(utc=False)
  36. def test_read_configuration_no_env(self):
  37. assert base.BaseLoader(app=self.app).read_configuration(
  38. 'FOO_X_S_WE_WQ_Q_WE') is None
  39. def test_autodiscovery(self):
  40. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  41. auto.return_value = [Mock()]
  42. auto.return_value[0].__name__ = 'moo'
  43. self.loader.autodiscover_tasks(['A', 'B'])
  44. assert 'moo' in self.loader.task_modules
  45. self.loader.task_modules.discard('moo')
  46. def test_import_task_module(self):
  47. assert sys == self.loader.import_task_module('sys')
  48. def test_init_worker_process(self):
  49. self.loader.on_worker_process_init()
  50. m = self.loader.on_worker_process_init = Mock()
  51. self.loader.init_worker_process()
  52. m.assert_called_with()
  53. def test_config_from_object_module(self):
  54. self.loader.import_from_cwd = Mock()
  55. self.loader.config_from_object('module_name')
  56. self.loader.import_from_cwd.assert_called_with('module_name')
  57. def test_conf_property(self):
  58. assert self.loader.conf['foo'] == 'bar'
  59. assert self.loader._conf['foo'] == 'bar'
  60. assert self.loader.conf['foo'] == 'bar'
  61. def test_import_default_modules(self):
  62. def modnames(l):
  63. return [m.__name__ for m in l]
  64. self.app.conf.imports = ('os', 'sys')
  65. assert (sorted(modnames(self.loader.import_default_modules())) ==
  66. sorted(modnames([os, sys])))
  67. def test_import_from_cwd_custom_imp(self):
  68. imp = Mock(name='imp')
  69. self.loader.import_from_cwd('foo', imp=imp)
  70. imp.assert_called()
  71. def test_cmdline_config_ValueError(self):
  72. with pytest.raises(ValueError):
  73. self.loader.cmdline_config_parser(['broker.port=foobar'])
  74. class test_DefaultLoader:
  75. @patch('celery.loaders.base.find_module')
  76. def test_read_configuration_not_a_package(self, find_module):
  77. find_module.side_effect = NotAPackage()
  78. l = default.Loader(app=self.app)
  79. with pytest.raises(NotAPackage):
  80. l.read_configuration(fail_silently=False)
  81. @patch('celery.loaders.base.find_module')
  82. @mock.environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  83. def test_read_configuration_py_in_name(self, find_module):
  84. find_module.side_effect = NotAPackage()
  85. l = default.Loader(app=self.app)
  86. with pytest.raises(NotAPackage):
  87. l.read_configuration(fail_silently=False)
  88. @patch('celery.loaders.base.find_module')
  89. def test_read_configuration_importerror(self, find_module):
  90. default.C_WNOCONF = True
  91. find_module.side_effect = ImportError()
  92. l = default.Loader(app=self.app)
  93. with pytest.warns(NotConfigured):
  94. l.read_configuration(fail_silently=True)
  95. default.C_WNOCONF = False
  96. l.read_configuration(fail_silently=True)
  97. def test_read_configuration(self):
  98. from types import ModuleType
  99. class ConfigModule(ModuleType):
  100. pass
  101. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  102. celeryconfig = ConfigModule(configname)
  103. celeryconfig.imports = ('os', 'sys')
  104. prevconfig = sys.modules.get(configname)
  105. sys.modules[configname] = celeryconfig
  106. try:
  107. l = default.Loader(app=self.app)
  108. l.find_module = Mock(name='find_module')
  109. settings = l.read_configuration(fail_silently=False)
  110. assert settings.imports == ('os', 'sys')
  111. settings = l.read_configuration(fail_silently=False)
  112. assert settings.imports == ('os', 'sys')
  113. l.on_worker_init()
  114. finally:
  115. if prevconfig:
  116. sys.modules[configname] = prevconfig
  117. def test_read_configuration_ImportError(self):
  118. sentinel = object()
  119. prev, os.environ['CELERY_CONFIG_MODULE'] = (
  120. os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',
  121. )
  122. try:
  123. l = default.Loader(app=self.app)
  124. with pytest.raises(ImportError):
  125. l.read_configuration(fail_silently=False)
  126. l.read_configuration(fail_silently=True)
  127. finally:
  128. if prev is not sentinel:
  129. os.environ['CELERY_CONFIG_MODULE'] = prev
  130. else:
  131. os.environ.pop('CELERY_CONFIG_MODULE', None)
  132. def test_import_from_cwd(self):
  133. l = default.Loader(app=self.app)
  134. old_path = list(sys.path)
  135. try:
  136. sys.path.remove(os.getcwd())
  137. except ValueError:
  138. pass
  139. celery = sys.modules.pop('celery', None)
  140. sys.modules.pop('celery.local', None)
  141. try:
  142. assert l.import_from_cwd('celery')
  143. sys.modules.pop('celery', None)
  144. sys.modules.pop('celery.local', None)
  145. sys.path.insert(0, os.getcwd())
  146. assert l.import_from_cwd('celery')
  147. finally:
  148. sys.path = old_path
  149. sys.modules['celery'] = celery
  150. def test_unconfigured_settings(self):
  151. context_executed = [False]
  152. class _Loader(default.Loader):
  153. def find_module(self, name):
  154. raise ImportError(name)
  155. with warnings.catch_warnings(record=True):
  156. l = _Loader(app=self.app)
  157. assert not l.configured
  158. context_executed[0] = True
  159. assert context_executed[0]
  160. class test_AppLoader:
  161. def setup(self):
  162. self.loader = AppLoader(app=self.app)
  163. def test_on_worker_init(self):
  164. self.app.conf.imports = ('subprocess',)
  165. sys.modules.pop('subprocess', None)
  166. self.loader.init_worker()
  167. assert 'subprocess' in sys.modules
  168. class test_autodiscovery:
  169. def test_autodiscover_tasks(self):
  170. base._RACE_PROTECTION = True
  171. try:
  172. base.autodiscover_tasks(['foo'])
  173. finally:
  174. base._RACE_PROTECTION = False
  175. with patch('celery.loaders.base.find_related_module') as frm:
  176. base.autodiscover_tasks(['foo'])
  177. frm.assert_called()
  178. def test_find_related_module(self):
  179. with patch('importlib.import_module') as imp:
  180. with patch('imp.find_module') as find:
  181. imp.return_value = Mock()
  182. imp.return_value.__path__ = 'foo'
  183. base.find_related_module(base, 'tasks')
  184. def se1(val):
  185. imp.side_effect = AttributeError()
  186. imp.side_effect = se1
  187. base.find_related_module(base, 'tasks')
  188. imp.side_effect = None
  189. find.side_effect = ImportError()
  190. base.find_related_module(base, 'tasks')