test_loaders.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. import sys
  4. import warnings
  5. from celery import loaders
  6. from celery.exceptions import NotConfigured
  7. from celery.loaders import base
  8. from celery.loaders import default
  9. from celery.loaders.app import AppLoader
  10. from celery.utils.imports import NotAPackage
  11. from celery.tests.case import AppCase, Case, Mock, mock, patch
  12. class DummyLoader(base.BaseLoader):
  13. def read_configuration(self):
  14. return {'foo': 'bar', 'imports': ('os', 'sys')}
  15. class test_loaders(AppCase):
  16. def test_get_loader_cls(self):
  17. self.assertEqual(loaders.get_loader_cls('default'),
  18. default.Loader)
  19. class test_LoaderBase(AppCase):
  20. message_options = {'subject': 'Subject',
  21. 'body': 'Body',
  22. 'sender': 'x@x.com',
  23. 'to': 'y@x.com'}
  24. server_options = {'host': 'smtp.x.com',
  25. 'port': 1234,
  26. 'user': 'x',
  27. 'password': 'qwerty',
  28. 'timeout': 3}
  29. def setup(self):
  30. self.loader = DummyLoader(app=self.app)
  31. def test_handlers_pass(self):
  32. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  33. self.loader.on_worker_init()
  34. def test_now(self):
  35. self.assertTrue(self.loader.now(utc=True))
  36. self.assertTrue(self.loader.now(utc=False))
  37. def test_read_configuration_no_env(self):
  38. self.assertIsNone(
  39. base.BaseLoader(app=self.app).read_configuration(
  40. 'FOO_X_S_WE_WQ_Q_WE'),
  41. )
  42. def test_autodiscovery(self):
  43. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  44. auto.return_value = [Mock()]
  45. auto.return_value[0].__name__ = 'moo'
  46. self.loader.autodiscover_tasks(['A', 'B'])
  47. self.assertIn('moo', self.loader.task_modules)
  48. self.loader.task_modules.discard('moo')
  49. def test_import_task_module(self):
  50. self.assertEqual(sys, self.loader.import_task_module('sys'))
  51. def test_init_worker_process(self):
  52. self.loader.on_worker_process_init()
  53. m = self.loader.on_worker_process_init = Mock()
  54. self.loader.init_worker_process()
  55. m.assert_called_with()
  56. def test_config_from_object_module(self):
  57. self.loader.import_from_cwd = Mock()
  58. self.loader.config_from_object('module_name')
  59. self.loader.import_from_cwd.assert_called_with('module_name')
  60. def test_conf_property(self):
  61. self.assertEqual(self.loader.conf['foo'], 'bar')
  62. self.assertEqual(self.loader._conf['foo'], 'bar')
  63. self.assertEqual(self.loader.conf['foo'], 'bar')
  64. def test_import_default_modules(self):
  65. def modnames(l):
  66. return [m.__name__ for m in l]
  67. self.app.conf.imports = ('os', 'sys')
  68. self.assertEqual(
  69. sorted(modnames(self.loader.import_default_modules())),
  70. sorted(modnames([os, sys])),
  71. )
  72. def test_import_from_cwd_custom_imp(self):
  73. imp = Mock(name='imp')
  74. self.loader.import_from_cwd('foo', imp=imp)
  75. imp.assert_called()
  76. def test_cmdline_config_ValueError(self):
  77. with self.assertRaises(ValueError):
  78. self.loader.cmdline_config_parser(['broker.port=foobar'])
  79. class test_DefaultLoader(AppCase):
  80. @patch('celery.loaders.base.find_module')
  81. def test_read_configuration_not_a_package(self, find_module):
  82. find_module.side_effect = NotAPackage()
  83. l = default.Loader(app=self.app)
  84. with self.assertRaises(NotAPackage):
  85. l.read_configuration(fail_silently=False)
  86. @patch('celery.loaders.base.find_module')
  87. @mock.environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  88. def test_read_configuration_py_in_name(self, find_module):
  89. find_module.side_effect = NotAPackage()
  90. l = default.Loader(app=self.app)
  91. with self.assertRaises(NotAPackage):
  92. l.read_configuration(fail_silently=False)
  93. @patch('celery.loaders.base.find_module')
  94. def test_read_configuration_importerror(self, find_module):
  95. default.C_WNOCONF = True
  96. find_module.side_effect = ImportError()
  97. l = default.Loader(app=self.app)
  98. with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):
  99. l.read_configuration(fail_silently=True)
  100. default.C_WNOCONF = False
  101. l.read_configuration(fail_silently=True)
  102. def test_read_configuration(self):
  103. from types import ModuleType
  104. class ConfigModule(ModuleType):
  105. pass
  106. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  107. celeryconfig = ConfigModule(configname)
  108. celeryconfig.imports = ('os', 'sys')
  109. prevconfig = sys.modules.get(configname)
  110. sys.modules[configname] = celeryconfig
  111. try:
  112. l = default.Loader(app=self.app)
  113. l.find_module = Mock(name='find_module')
  114. settings = l.read_configuration(fail_silently=False)
  115. self.assertTupleEqual(settings.imports, ('os', 'sys'))
  116. settings = l.read_configuration(fail_silently=False)
  117. self.assertTupleEqual(settings.imports, ('os', 'sys'))
  118. l.on_worker_init()
  119. finally:
  120. if prevconfig:
  121. sys.modules[configname] = prevconfig
  122. def test_read_configuration_ImportError(self):
  123. sentinel = object()
  124. prev, os.environ['CELERY_CONFIG_MODULE'] = (
  125. os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',
  126. )
  127. try:
  128. l = default.Loader(app=self.app)
  129. with self.assertRaises(ImportError):
  130. l.read_configuration(fail_silently=False)
  131. l.read_configuration(fail_silently=True)
  132. finally:
  133. if prev is not sentinel:
  134. os.environ['CELERY_CONFIG_MODULE'] = prev
  135. else:
  136. os.environ.pop('CELERY_CONFIG_MODULE', None)
  137. def test_import_from_cwd(self):
  138. l = default.Loader(app=self.app)
  139. old_path = list(sys.path)
  140. try:
  141. sys.path.remove(os.getcwd())
  142. except ValueError:
  143. pass
  144. celery = sys.modules.pop('celery', None)
  145. sys.modules.pop('celery.five', None)
  146. try:
  147. self.assertTrue(l.import_from_cwd('celery'))
  148. sys.modules.pop('celery', None)
  149. sys.modules.pop('celery.five', None)
  150. sys.path.insert(0, os.getcwd())
  151. self.assertTrue(l.import_from_cwd('celery'))
  152. finally:
  153. sys.path = old_path
  154. sys.modules['celery'] = celery
  155. def test_unconfigured_settings(self):
  156. context_executed = [False]
  157. class _Loader(default.Loader):
  158. def find_module(self, name):
  159. raise ImportError(name)
  160. with warnings.catch_warnings(record=True):
  161. l = _Loader(app=self.app)
  162. self.assertFalse(l.configured)
  163. context_executed[0] = True
  164. self.assertTrue(context_executed[0])
  165. class test_AppLoader(AppCase):
  166. def setup(self):
  167. self.loader = AppLoader(app=self.app)
  168. def test_on_worker_init(self):
  169. self.app.conf.imports = ('subprocess',)
  170. sys.modules.pop('subprocess', None)
  171. self.loader.init_worker()
  172. self.assertIn('subprocess', sys.modules)
  173. class test_autodiscovery(Case):
  174. def test_autodiscover_tasks(self):
  175. base._RACE_PROTECTION = True
  176. try:
  177. base.autodiscover_tasks(['foo'])
  178. finally:
  179. base._RACE_PROTECTION = False
  180. with patch('celery.loaders.base.find_related_module') as frm:
  181. base.autodiscover_tasks(['foo'])
  182. frm.assert_called()
  183. def test_find_related_module(self):
  184. with patch('importlib.import_module') as imp:
  185. with patch('imp.find_module') as find:
  186. imp.return_value = Mock()
  187. imp.return_value.__path__ = 'foo'
  188. base.find_related_module(base, 'tasks')
  189. def se1(val):
  190. imp.side_effect = AttributeError()
  191. imp.side_effect = se1
  192. base.find_related_module(base, 'tasks')
  193. imp.side_effect = None
  194. find.side_effect = ImportError()
  195. base.find_related_module(base, 'tasks')