test_loaders.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. import warnings
  5. from celery import loaders
  6. from celery.exceptions import (
  7. NotConfigured,
  8. )
  9. from celery.loaders import base
  10. from celery.loaders import default
  11. from celery.loaders.app import AppLoader
  12. from celery.utils.imports import NotAPackage
  13. from celery.utils.mail import SendmailWarning
  14. from celery.tests.case import (
  15. AppCase, Case, Mock, depends_on_current_app, patch, with_environ,
  16. )
  17. class DummyLoader(base.BaseLoader):
  18. def read_configuration(self):
  19. return {'foo': 'bar', 'imports': ('os', 'sys')}
  20. class test_loaders(AppCase):
  21. def test_get_loader_cls(self):
  22. self.assertEqual(loaders.get_loader_cls('default'),
  23. default.Loader)
  24. @depends_on_current_app
  25. def test_current_loader(self):
  26. with self.assertPendingDeprecation():
  27. self.assertIs(loaders.current_loader(), self.app.loader)
  28. @depends_on_current_app
  29. def test_load_settings(self):
  30. with self.assertPendingDeprecation():
  31. self.assertIs(loaders.load_settings(), self.app.conf)
  32. class test_LoaderBase(AppCase):
  33. message_options = {'subject': 'Subject',
  34. 'body': 'Body',
  35. 'sender': 'x@x.com',
  36. 'to': 'y@x.com'}
  37. server_options = {'host': 'smtp.x.com',
  38. 'port': 1234,
  39. 'user': 'x',
  40. 'password': 'qwerty',
  41. 'timeout': 3}
  42. def setup(self):
  43. self.loader = DummyLoader(app=self.app)
  44. def test_handlers_pass(self):
  45. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  46. self.loader.on_worker_init()
  47. def test_now(self):
  48. self.assertTrue(self.loader.now(utc=True))
  49. self.assertTrue(self.loader.now(utc=False))
  50. def test_read_configuration_no_env(self):
  51. self.assertIsNone(
  52. base.BaseLoader(app=self.app).read_configuration(
  53. 'FOO_X_S_WE_WQ_Q_WE'),
  54. )
  55. def test_autodiscovery(self):
  56. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  57. auto.return_value = [Mock()]
  58. auto.return_value[0].__name__ = 'moo'
  59. self.loader.autodiscover_tasks(['A', 'B'])
  60. self.assertIn('moo', self.loader.task_modules)
  61. self.loader.task_modules.discard('moo')
  62. def test_import_task_module(self):
  63. self.assertEqual(sys, self.loader.import_task_module('sys'))
  64. def test_init_worker_process(self):
  65. self.loader.on_worker_process_init()
  66. m = self.loader.on_worker_process_init = Mock()
  67. self.loader.init_worker_process()
  68. m.assert_called_with()
  69. def test_config_from_object_module(self):
  70. self.loader.import_from_cwd = Mock()
  71. self.loader.config_from_object('module_name')
  72. self.loader.import_from_cwd.assert_called_with('module_name')
  73. def test_conf_property(self):
  74. self.assertEqual(self.loader.conf['foo'], 'bar')
  75. self.assertEqual(self.loader._conf['foo'], 'bar')
  76. self.assertEqual(self.loader.conf['foo'], 'bar')
  77. def test_import_default_modules(self):
  78. def modnames(l):
  79. return [m.__name__ for m in l]
  80. self.app.conf.imports = ('os', 'sys')
  81. self.assertEqual(
  82. sorted(modnames(self.loader.import_default_modules())),
  83. sorted(modnames([os, sys])),
  84. )
  85. def test_import_from_cwd_custom_imp(self):
  86. def imp(module, package=None):
  87. imp.called = True
  88. imp.called = False
  89. self.loader.import_from_cwd('foo', imp=imp)
  90. self.assertTrue(imp.called)
  91. @patch('celery.utils.mail.Mailer._send')
  92. def test_mail_admins_errors(self, send):
  93. send.side_effect = KeyError()
  94. opts = dict(self.message_options, **self.server_options)
  95. with self.assertWarnsRegex(SendmailWarning, r'KeyError'):
  96. self.loader.mail_admins(fail_silently=True, **opts)
  97. with self.assertRaises(KeyError):
  98. self.loader.mail_admins(fail_silently=False, **opts)
  99. @patch('celery.utils.mail.Mailer._send')
  100. def test_mail_admins(self, send):
  101. opts = dict(self.message_options, **self.server_options)
  102. self.loader.mail_admins(**opts)
  103. self.assertTrue(send.call_args)
  104. message = send.call_args[0][0]
  105. self.assertEqual(message.to, [self.message_options['to']])
  106. self.assertEqual(message.subject, self.message_options['subject'])
  107. self.assertEqual(message.sender, self.message_options['sender'])
  108. self.assertEqual(message.body, self.message_options['body'])
  109. def test_mail_attribute(self):
  110. from celery.utils import mail
  111. loader = base.BaseLoader(app=self.app)
  112. self.assertIs(loader.mail, mail)
  113. def test_cmdline_config_ValueError(self):
  114. with self.assertRaises(ValueError):
  115. self.loader.cmdline_config_parser(['broker.port=foobar'])
  116. class test_DefaultLoader(AppCase):
  117. @patch('celery.loaders.base.find_module')
  118. def test_read_configuration_not_a_package(self, find_module):
  119. find_module.side_effect = NotAPackage()
  120. l = default.Loader(app=self.app)
  121. with self.assertRaises(NotAPackage):
  122. l.read_configuration(fail_silently=False)
  123. @patch('celery.loaders.base.find_module')
  124. @with_environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  125. def test_read_configuration_py_in_name(self, find_module):
  126. find_module.side_effect = NotAPackage()
  127. l = default.Loader(app=self.app)
  128. with self.assertRaises(NotAPackage):
  129. l.read_configuration(fail_silently=False)
  130. @patch('celery.loaders.base.find_module')
  131. def test_read_configuration_importerror(self, find_module):
  132. default.C_WNOCONF = True
  133. find_module.side_effect = ImportError()
  134. l = default.Loader(app=self.app)
  135. with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):
  136. l.read_configuration(fail_silently=True)
  137. default.C_WNOCONF = False
  138. l.read_configuration(fail_silently=True)
  139. def test_read_configuration(self):
  140. from types import ModuleType
  141. class ConfigModule(ModuleType):
  142. pass
  143. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  144. celeryconfig = ConfigModule(configname)
  145. celeryconfig.imports = ('os', 'sys')
  146. prevconfig = sys.modules.get(configname)
  147. sys.modules[configname] = celeryconfig
  148. try:
  149. l = default.Loader(app=self.app)
  150. l.find_module = Mock(name='find_module')
  151. settings = l.read_configuration(fail_silently=False)
  152. self.assertTupleEqual(settings.imports, ('os', 'sys'))
  153. settings = l.read_configuration(fail_silently=False)
  154. self.assertTupleEqual(settings.imports, ('os', 'sys'))
  155. l.on_worker_init()
  156. finally:
  157. if prevconfig:
  158. sys.modules[configname] = prevconfig
  159. def test_import_from_cwd(self):
  160. l = default.Loader(app=self.app)
  161. old_path = list(sys.path)
  162. try:
  163. sys.path.remove(os.getcwd())
  164. except ValueError:
  165. pass
  166. celery = sys.modules.pop('celery', None)
  167. sys.modules.pop('celery.five', None)
  168. try:
  169. self.assertTrue(l.import_from_cwd('celery'))
  170. sys.modules.pop('celery', None)
  171. sys.modules.pop('celery.five', None)
  172. sys.path.insert(0, os.getcwd())
  173. self.assertTrue(l.import_from_cwd('celery'))
  174. finally:
  175. sys.path = old_path
  176. sys.modules['celery'] = celery
  177. def test_unconfigured_settings(self):
  178. context_executed = [False]
  179. class _Loader(default.Loader):
  180. def find_module(self, name):
  181. raise ImportError(name)
  182. with warnings.catch_warnings(record=True):
  183. l = _Loader(app=self.app)
  184. self.assertFalse(l.configured)
  185. context_executed[0] = True
  186. self.assertTrue(context_executed[0])
  187. class test_AppLoader(AppCase):
  188. def setup(self):
  189. self.loader = AppLoader(app=self.app)
  190. def test_on_worker_init(self):
  191. self.app.conf.imports = ('subprocess',)
  192. sys.modules.pop('subprocess', None)
  193. self.loader.init_worker()
  194. self.assertIn('subprocess', sys.modules)
  195. class test_autodiscovery(Case):
  196. def test_autodiscover_tasks(self):
  197. base._RACE_PROTECTION = True
  198. try:
  199. base.autodiscover_tasks(['foo'])
  200. finally:
  201. base._RACE_PROTECTION = False
  202. with patch('celery.loaders.base.find_related_module') as frm:
  203. base.autodiscover_tasks(['foo'])
  204. self.assertTrue(frm.called)
  205. def test_find_related_module(self):
  206. with patch('importlib.import_module') as imp:
  207. with patch('imp.find_module') as find:
  208. imp.return_value = Mock()
  209. imp.return_value.__path__ = 'foo'
  210. base.find_related_module(base, 'tasks')
  211. def se1(val):
  212. imp.side_effect = AttributeError()
  213. imp.side_effect = se1
  214. base.find_related_module(base, 'tasks')
  215. imp.side_effect = None
  216. find.side_effect = ImportError()
  217. base.find_related_module(base, 'tasks')