test_loaders.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. import sys
  4. import warnings
  5. from celery import loaders
  6. from celery.exceptions import NotConfigured
  7. from celery.five import module_name_t
  8. from celery.loaders import base
  9. from celery.loaders import default
  10. from celery.loaders.app import AppLoader
  11. from celery.utils.imports import NotAPackage
  12. from celery.utils.mail import SendmailWarning
  13. from celery.tests.case import AppCase, Case, Mock, mock_environ, patch
  14. class DummyLoader(base.BaseLoader):
  15. def read_configuration(self):
  16. return {'foo': 'bar', 'imports': ('os', 'sys')}
  17. class test_loaders(AppCase):
  18. def test_get_loader_cls(self):
  19. self.assertEqual(loaders.get_loader_cls('default'),
  20. default.Loader)
  21. class test_LoaderBase(AppCase):
  22. message_options = {'subject': 'Subject',
  23. 'body': 'Body',
  24. 'sender': 'x@x.com',
  25. 'to': 'y@x.com'}
  26. server_options = {'host': 'smtp.x.com',
  27. 'port': 1234,
  28. 'user': 'x',
  29. 'password': 'qwerty',
  30. 'timeout': 3}
  31. def setup(self):
  32. self.loader = DummyLoader(app=self.app)
  33. def test_handlers_pass(self):
  34. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  35. self.loader.on_worker_init()
  36. def test_now(self):
  37. self.assertTrue(self.loader.now(utc=True))
  38. self.assertTrue(self.loader.now(utc=False))
  39. def test_read_configuration_no_env(self):
  40. self.assertIsNone(
  41. base.BaseLoader(app=self.app).read_configuration(
  42. 'FOO_X_S_WE_WQ_Q_WE'),
  43. )
  44. def test_autodiscovery(self):
  45. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  46. auto.return_value = [Mock()]
  47. auto.return_value[0].__name__ = 'moo'
  48. self.loader.autodiscover_tasks(['A', 'B'])
  49. self.assertIn('moo', self.loader.task_modules)
  50. self.loader.task_modules.discard('moo')
  51. def test_import_task_module(self):
  52. self.assertEqual(sys, self.loader.import_task_module('sys'))
  53. def test_init_worker_process(self):
  54. self.loader.on_worker_process_init()
  55. m = self.loader.on_worker_process_init = Mock()
  56. self.loader.init_worker_process()
  57. m.assert_called_with()
  58. def test_config_from_object_module(self):
  59. self.loader.import_from_cwd = Mock()
  60. self.loader.config_from_object('module_name')
  61. self.loader.import_from_cwd.assert_called_with('module_name')
  62. def test_conf_property(self):
  63. self.assertEqual(self.loader.conf['foo'], 'bar')
  64. self.assertEqual(self.loader._conf['foo'], 'bar')
  65. self.assertEqual(self.loader.conf['foo'], 'bar')
  66. def test_import_default_modules(self):
  67. def modnames(l):
  68. return [m.__name__ for m in l]
  69. self.app.conf.imports = ('os', 'sys')
  70. self.assertEqual(
  71. sorted(modnames(self.loader.import_default_modules())),
  72. sorted(modnames([os, sys])),
  73. )
  74. def test_import_from_cwd_custom_imp(self):
  75. def imp(module, package=None):
  76. imp.called = True
  77. imp.called = False
  78. self.loader.import_from_cwd('foo', imp=imp)
  79. self.assertTrue(imp.called)
  80. @patch('celery.utils.mail.Mailer._send')
  81. def test_mail_admins_errors(self, send):
  82. send.side_effect = KeyError()
  83. opts = dict(self.message_options, **self.server_options)
  84. with self.assertWarnsRegex(SendmailWarning, r'KeyError'):
  85. self.loader.mail_admins(fail_silently=True, **opts)
  86. with self.assertRaises(KeyError):
  87. self.loader.mail_admins(fail_silently=False, **opts)
  88. @patch('celery.utils.mail.Mailer._send')
  89. def test_mail_admins(self, send):
  90. opts = dict(self.message_options, **self.server_options)
  91. self.loader.mail_admins(**opts)
  92. self.assertTrue(send.call_args)
  93. message = send.call_args[0][0]
  94. self.assertEqual(message.to, [self.message_options['to']])
  95. self.assertEqual(message.subject, self.message_options['subject'])
  96. self.assertEqual(message.sender, self.message_options['sender'])
  97. self.assertEqual(message.body, self.message_options['body'])
  98. def test_mail_attribute(self):
  99. from celery.utils import mail
  100. loader = base.BaseLoader(app=self.app)
  101. self.assertIs(loader.mail, mail)
  102. def test_cmdline_config_ValueError(self):
  103. with self.assertRaises(ValueError):
  104. self.loader.cmdline_config_parser(['broker.port=foobar'])
  105. class test_DefaultLoader(AppCase):
  106. @patch('celery.loaders.base.find_module')
  107. def test_read_configuration_not_a_package(self, find_module):
  108. find_module.side_effect = NotAPackage()
  109. l = default.Loader(app=self.app)
  110. with self.assertRaises(NotAPackage):
  111. l.read_configuration(fail_silently=False)
  112. @patch('celery.loaders.base.find_module')
  113. @mock_environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  114. def test_read_configuration_py_in_name(self, find_module):
  115. find_module.side_effect = NotAPackage()
  116. l = default.Loader(app=self.app)
  117. with self.assertRaises(NotAPackage):
  118. l.read_configuration(fail_silently=False)
  119. @patch('celery.loaders.base.find_module')
  120. def test_read_configuration_importerror(self, find_module):
  121. default.C_WNOCONF = True
  122. find_module.side_effect = ImportError()
  123. l = default.Loader(app=self.app)
  124. with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):
  125. l.read_configuration(fail_silently=True)
  126. default.C_WNOCONF = False
  127. l.read_configuration(fail_silently=True)
  128. def test_read_configuration(self):
  129. from types import ModuleType
  130. class ConfigModule(ModuleType):
  131. pass
  132. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  133. celeryconfig = ConfigModule(module_name_t(configname))
  134. celeryconfig.imports = ('os', 'sys')
  135. prevconfig = sys.modules.get(configname)
  136. sys.modules[configname] = celeryconfig
  137. try:
  138. l = default.Loader(app=self.app)
  139. l.find_module = Mock(name='find_module')
  140. settings = l.read_configuration(fail_silently=False)
  141. self.assertTupleEqual(settings.imports, ('os', 'sys'))
  142. settings = l.read_configuration(fail_silently=False)
  143. self.assertTupleEqual(settings.imports, ('os', 'sys'))
  144. l.on_worker_init()
  145. finally:
  146. if prevconfig:
  147. sys.modules[configname] = prevconfig
  148. def test_read_configuration_ImportError(self):
  149. sentinel = object()
  150. prev, os.environ['CELERY_CONFIG_MODULE'] = (
  151. os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',
  152. )
  153. try:
  154. l = default.Loader(app=self.app)
  155. with self.assertRaises(ImportError):
  156. l.read_configuration(fail_silently=False)
  157. l.read_configuration(fail_silently=True)
  158. finally:
  159. if prev is not sentinel:
  160. os.environ['CELERY_CONFIG_MODULE'] = prev
  161. else:
  162. os.environ.pop('CELERY_CONFIG_MODULE', None)
  163. def test_import_from_cwd(self):
  164. l = default.Loader(app=self.app)
  165. old_path = list(sys.path)
  166. try:
  167. sys.path.remove(os.getcwd())
  168. except ValueError:
  169. pass
  170. celery = sys.modules.pop('celery', None)
  171. sys.modules.pop('celery.five', None)
  172. try:
  173. self.assertTrue(l.import_from_cwd('celery'))
  174. sys.modules.pop('celery', None)
  175. sys.modules.pop('celery.five', None)
  176. sys.path.insert(0, os.getcwd())
  177. self.assertTrue(l.import_from_cwd('celery'))
  178. finally:
  179. sys.path = old_path
  180. sys.modules['celery'] = celery
  181. def test_unconfigured_settings(self):
  182. context_executed = [False]
  183. class _Loader(default.Loader):
  184. def find_module(self, name):
  185. raise ImportError(name)
  186. with warnings.catch_warnings(record=True):
  187. l = _Loader(app=self.app)
  188. self.assertFalse(l.configured)
  189. context_executed[0] = True
  190. self.assertTrue(context_executed[0])
  191. class test_AppLoader(AppCase):
  192. def setup(self):
  193. self.loader = AppLoader(app=self.app)
  194. def test_on_worker_init(self):
  195. self.app.conf.imports = ('subprocess',)
  196. sys.modules.pop('subprocess', None)
  197. self.loader.init_worker()
  198. self.assertIn('subprocess', sys.modules)
  199. class test_autodiscovery(Case):
  200. def test_autodiscover_tasks(self):
  201. base._RACE_PROTECTION = True
  202. try:
  203. base.autodiscover_tasks(['foo'])
  204. finally:
  205. base._RACE_PROTECTION = False
  206. with patch('celery.loaders.base.find_related_module') as frm:
  207. base.autodiscover_tasks(['foo'])
  208. self.assertTrue(frm.called)
  209. def test_find_related_module(self):
  210. with patch('importlib.import_module') as imp:
  211. with patch('imp.find_module') as find:
  212. imp.return_value = Mock()
  213. imp.return_value.__path__ = 'foo'
  214. base.find_related_module(base, 'tasks')
  215. def se1(val):
  216. imp.side_effect = AttributeError()
  217. imp.side_effect = se1
  218. base.find_related_module(base, 'tasks')
  219. imp.side_effect = None
  220. find.side_effect = ImportError()
  221. base.find_related_module(base, 'tasks')