test_base.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. import os
  2. from celery.bin.base import (
  3. Command,
  4. Option,
  5. Extensions,
  6. HelpFormatter,
  7. )
  8. from celery.utils.objects import Bunch
  9. from celery.tests.case import (
  10. AppCase, Mock, depends_on_current_app, mock, patch,
  11. )
  12. class MyApp:
  13. user_options = {'preload': None}
  14. APP = MyApp() # <-- Used by test_with_custom_app
  15. class MockCommand(Command):
  16. mock_args = ('arg1', 'arg2', 'arg3')
  17. def parse_options(self, prog_name, arguments, command=None):
  18. options = Bunch(foo='bar', prog_name=prog_name)
  19. return options, self.mock_args
  20. def run(self, *args, **kwargs):
  21. return args, kwargs
  22. class test_Extensions(AppCase):
  23. def test_load(self):
  24. with patch('pkg_resources.iter_entry_points') as iterep:
  25. with patch('celery.bin.base.symbol_by_name') as symbyname:
  26. ep = Mock()
  27. ep.name = 'ep'
  28. ep.module_name = 'foo'
  29. ep.attrs = ['bar', 'baz']
  30. iterep.return_value = [ep]
  31. cls = symbyname.return_value = Mock()
  32. register = Mock()
  33. e = Extensions('unit', register)
  34. e.load()
  35. symbyname.assert_called_with('foo:bar')
  36. register.assert_called_with(cls, name='ep')
  37. with patch('celery.bin.base.symbol_by_name') as symbyname:
  38. symbyname.side_effect = SyntaxError()
  39. with patch('warnings.warn') as warn:
  40. e.load()
  41. warn.assert_called()
  42. with patch('celery.bin.base.symbol_by_name') as symbyname:
  43. symbyname.side_effect = KeyError('foo')
  44. with self.assertRaises(KeyError):
  45. e.load()
  46. class test_HelpFormatter(AppCase):
  47. def test_format_epilog(self):
  48. f = HelpFormatter()
  49. self.assertTrue(f.format_epilog('hello'))
  50. self.assertFalse(f.format_epilog(''))
  51. def test_format_description(self):
  52. f = HelpFormatter()
  53. self.assertTrue(f.format_description('hello'))
  54. class test_Command(AppCase):
  55. def test_get_options(self):
  56. cmd = Command()
  57. cmd.option_list = (1, 2, 3)
  58. self.assertTupleEqual(cmd.get_options(), (1, 2, 3))
  59. def test_custom_description(self):
  60. class C(Command):
  61. description = 'foo'
  62. c = C()
  63. self.assertEqual(c.description, 'foo')
  64. def test_register_callbacks(self):
  65. c = Command(on_error=8, on_usage_error=9)
  66. self.assertEqual(c.on_error, 8)
  67. self.assertEqual(c.on_usage_error, 9)
  68. def test_run_raises_UsageError(self):
  69. cb = Mock()
  70. c = Command(on_usage_error=cb)
  71. c.verify_args = Mock()
  72. c.run = Mock()
  73. exc = c.run.side_effect = c.UsageError('foo', status=3)
  74. self.assertEqual(c(), exc.status)
  75. cb.assert_called_with(exc)
  76. c.verify_args.assert_called_with(())
  77. def test_default_on_usage_error(self):
  78. cmd = Command()
  79. cmd.handle_error = Mock()
  80. exc = Exception()
  81. cmd.on_usage_error(exc)
  82. cmd.handle_error.assert_called_with(exc)
  83. def test_verify_args_missing(self):
  84. c = Command()
  85. def run(a, b, c):
  86. pass
  87. c.run = run
  88. with self.assertRaises(c.UsageError):
  89. c.verify_args((1,))
  90. c.verify_args((1, 2, 3))
  91. def test_run_interface(self):
  92. with self.assertRaises(NotImplementedError):
  93. Command().run()
  94. @patch('sys.stdout')
  95. def test_early_version(self, stdout):
  96. cmd = Command()
  97. with self.assertRaises(SystemExit):
  98. cmd.early_version(['--version'])
  99. def test_execute_from_commandline(self):
  100. cmd = MockCommand(app=self.app)
  101. args1, kwargs1 = cmd.execute_from_commandline() # sys.argv
  102. self.assertTupleEqual(args1, cmd.mock_args)
  103. self.assertDictContainsSubset({'foo': 'bar'}, kwargs1)
  104. self.assertTrue(kwargs1.get('prog_name'))
  105. args2, kwargs2 = cmd.execute_from_commandline(['foo']) # pass list
  106. self.assertTupleEqual(args2, cmd.mock_args)
  107. self.assertDictContainsSubset({'foo': 'bar', 'prog_name': 'foo'},
  108. kwargs2)
  109. @mock.stdouts
  110. def test_with_bogus_args(self, _, stderr):
  111. cmd = MockCommand(app=self.app)
  112. cmd.supports_args = False
  113. with self.assertRaises(SystemExit):
  114. cmd.execute_from_commandline(argv=['--bogus'])
  115. self.assertTrue(stderr.getvalue())
  116. self.assertIn('Unrecognized', stderr.getvalue())
  117. def test_with_custom_config_module(self):
  118. prev = os.environ.pop('CELERY_CONFIG_MODULE', None)
  119. try:
  120. cmd = MockCommand(app=self.app)
  121. cmd.setup_app_from_commandline(['--config=foo.bar.baz'])
  122. self.assertEqual(os.environ.get('CELERY_CONFIG_MODULE'),
  123. 'foo.bar.baz')
  124. finally:
  125. if prev:
  126. os.environ['CELERY_CONFIG_MODULE'] = prev
  127. else:
  128. os.environ.pop('CELERY_CONFIG_MODULE', None)
  129. def test_with_custom_broker(self):
  130. prev = os.environ.pop('CELERY_BROKER_URL', None)
  131. try:
  132. cmd = MockCommand(app=self.app)
  133. cmd.setup_app_from_commandline(['--broker=xyzza://'])
  134. self.assertEqual(
  135. os.environ.get('CELERY_BROKER_URL'), 'xyzza://',
  136. )
  137. finally:
  138. if prev:
  139. os.environ['CELERY_BROKER_URL'] = prev
  140. else:
  141. os.environ.pop('CELERY_BROKER_URL', None)
  142. def test_with_custom_app(self):
  143. cmd = MockCommand(app=self.app)
  144. app = '.'.join([__name__, 'APP'])
  145. cmd.setup_app_from_commandline(['--app=%s' % (app,),
  146. '--loglevel=INFO'])
  147. self.assertIs(cmd.app, APP)
  148. cmd.setup_app_from_commandline(['-A', app,
  149. '--loglevel=INFO'])
  150. self.assertIs(cmd.app, APP)
  151. def test_setup_app_sets_quiet(self):
  152. cmd = MockCommand(app=self.app)
  153. cmd.setup_app_from_commandline(['-q'])
  154. self.assertTrue(cmd.quiet)
  155. cmd2 = MockCommand(app=self.app)
  156. cmd2.setup_app_from_commandline(['--quiet'])
  157. self.assertTrue(cmd2.quiet)
  158. def test_setup_app_sets_chdir(self):
  159. with patch('os.chdir') as chdir:
  160. cmd = MockCommand(app=self.app)
  161. cmd.setup_app_from_commandline(['--workdir=/opt'])
  162. chdir.assert_called_with('/opt')
  163. def test_setup_app_sets_loader(self):
  164. prev = os.environ.get('CELERY_LOADER')
  165. try:
  166. cmd = MockCommand(app=self.app)
  167. cmd.setup_app_from_commandline(['--loader=X.Y:Z'])
  168. self.assertEqual(os.environ['CELERY_LOADER'], 'X.Y:Z')
  169. finally:
  170. if prev is not None:
  171. os.environ['CELERY_LOADER'] = prev
  172. def test_setup_app__no_requires_app(self):
  173. cmd = MockCommand(app=self.app)
  174. cmd.requires_app = False
  175. with patch('celery.bin.base.Celery') as cp:
  176. cmd.setup_app_from_commandline(['--app=x.y:z'])
  177. cp.assert_called()
  178. def test_setup_app_custom_app(self):
  179. cmd = MockCommand(app=self.app)
  180. app = cmd.app = Mock()
  181. app.user_options = {'preload': None}
  182. cmd.setup_app_from_commandline([])
  183. self.assertEqual(cmd.app, app)
  184. def test_find_app_suspects(self):
  185. cmd = MockCommand(app=self.app)
  186. self.assertTrue(cmd.find_app('celery.tests.bin.proj.app'))
  187. self.assertTrue(cmd.find_app('celery.tests.bin.proj'))
  188. self.assertTrue(cmd.find_app('celery.tests.bin.proj:hello'))
  189. self.assertTrue(cmd.find_app('celery.tests.bin.proj.hello'))
  190. self.assertTrue(cmd.find_app('celery.tests.bin.proj.app:app'))
  191. self.assertTrue(cmd.find_app('celery.tests.bin.proj.app.app'))
  192. with self.assertRaises(AttributeError):
  193. cmd.find_app('celery.tests.bin')
  194. with self.assertRaises(AttributeError):
  195. cmd.find_app(__name__)
  196. def test_ask(self):
  197. try:
  198. input = self.patch('celery.bin.base.input')
  199. except AttributeError:
  200. input = self.patch('builtins.input')
  201. cmd = MockCommand(app=self.app)
  202. input.return_value = 'yes'
  203. self.assertEqual(cmd.ask('q', ('yes', 'no'), 'no'), 'yes')
  204. input.return_value = 'nop'
  205. self.assertEqual(cmd.ask('q', ('yes', 'no'), 'no'), 'no')
  206. def test_host_format(self):
  207. cmd = MockCommand(app=self.app)
  208. with patch('celery.utils.nodenames.gethostname') as hn:
  209. hn.return_value = 'blacktron.example.com'
  210. self.assertEqual(cmd.host_format(''), '')
  211. self.assertEqual(
  212. cmd.host_format('celery@%h'),
  213. 'celery@blacktron.example.com',
  214. )
  215. self.assertEqual(
  216. cmd.host_format('celery@%d'),
  217. 'celery@example.com',
  218. )
  219. self.assertEqual(
  220. cmd.host_format('celery@%n'),
  221. 'celery@blacktron',
  222. )
  223. def test_say_chat_quiet(self):
  224. cmd = MockCommand(app=self.app)
  225. cmd.quiet = True
  226. self.assertIsNone(cmd.say_chat('<-', 'foo', 'foo'))
  227. def test_say_chat_show_body(self):
  228. cmd = MockCommand(app=self.app)
  229. cmd.out = Mock()
  230. cmd.show_body = True
  231. cmd.say_chat('->', 'foo', 'body')
  232. cmd.out.assert_called_with('body')
  233. def test_say_chat_no_body(self):
  234. cmd = MockCommand(app=self.app)
  235. cmd.out = Mock()
  236. cmd.show_body = False
  237. cmd.say_chat('->', 'foo', 'body')
  238. @depends_on_current_app
  239. def test_with_cmdline_config(self):
  240. cmd = MockCommand(app=self.app)
  241. cmd.enable_config_from_cmdline = True
  242. cmd.namespace = 'worker'
  243. rest = cmd.setup_app_from_commandline(argv=[
  244. '--loglevel=INFO', '--',
  245. 'broker.url=amqp://broker.example.com',
  246. '.prefetch_multiplier=100'])
  247. self.assertEqual(cmd.app.conf.broker_url,
  248. 'amqp://broker.example.com')
  249. self.assertEqual(cmd.app.conf.worker_prefetch_multiplier, 100)
  250. self.assertListEqual(rest, ['--loglevel=INFO'])
  251. cmd.app = None
  252. cmd.get_app = Mock(name='get_app')
  253. cmd.get_app.return_value = self.app
  254. self.app.user_options['preload'] = [
  255. Option('--foo', action='store_true'),
  256. ]
  257. cmd.setup_app_from_commandline(argv=[
  258. '--foo', '--loglevel=INFO', '--',
  259. 'broker.url=amqp://broker.example.com',
  260. '.prefetch_multiplier=100'])
  261. self.assertIs(cmd.app, cmd.get_app())
  262. def test_preparse_options__required_short(self):
  263. cmd = MockCommand(app=self.app)
  264. with self.assertRaises(ValueError):
  265. cmd.preparse_options(
  266. ['a', '-f'], [Option('-f', action='store')])
  267. def test_preparse_options__longopt_whitespace(self):
  268. cmd = MockCommand(app=self.app)
  269. cmd.preparse_options(
  270. ['a', '--foo', 'val'], [Option('--foo', action='store')])
  271. def test_preparse_options__shortopt_store_true(self):
  272. cmd = MockCommand(app=self.app)
  273. cmd.preparse_options(
  274. ['a', '--foo'], [Option('--foo', action='store_true')])
  275. def test_get_default_app(self):
  276. self.patch('celery._state.get_current_app')
  277. cmd = MockCommand(app=self.app)
  278. from celery._state import get_current_app
  279. self.assertIs(cmd._get_default_app(), get_current_app())
  280. def test_set_colored(self):
  281. cmd = MockCommand(app=self.app)
  282. cmd.colored = 'foo'
  283. self.assertEqual(cmd.colored, 'foo')
  284. def test_set_no_color(self):
  285. cmd = MockCommand(app=self.app)
  286. cmd.no_color = False
  287. _ = cmd.colored # noqa
  288. cmd.no_color = True
  289. self.assertFalse(cmd.colored.enabled)
  290. def test_find_app(self):
  291. cmd = MockCommand(app=self.app)
  292. with patch('celery.bin.base.symbol_by_name') as sbn:
  293. from types import ModuleType
  294. x = ModuleType('proj')
  295. def on_sbn(*args, **kwargs):
  296. def after(*args, **kwargs):
  297. x.app = 'quick brown fox'
  298. x.__path__ = None
  299. return x
  300. sbn.side_effect = after
  301. return x
  302. sbn.side_effect = on_sbn
  303. x.__path__ = [True]
  304. self.assertEqual(cmd.find_app('proj'), 'quick brown fox')
  305. def test_parse_preload_options_shortopt(self):
  306. cmd = Command()
  307. cmd.preload_options = (Option('-s', action='store', dest='silent'),)
  308. acc = cmd.parse_preload_options(['-s', 'yes'])
  309. self.assertEqual(acc.get('silent'), 'yes')
  310. def test_parse_preload_options_with_equals_and_append(self):
  311. cmd = Command()
  312. opt = Option('--zoom', action='append', default=[])
  313. cmd.preload_options = (opt,)
  314. acc = cmd.parse_preload_options(['--zoom=1', '--zoom=2'])
  315. self.assertEqual(acc, {'zoom': ['1', '2']})
  316. def test_parse_preload_options_without_equals_and_append(self):
  317. cmd = Command()
  318. opt = Option('--zoom', action='append', default=[])
  319. cmd.preload_options = (opt,)
  320. acc = cmd.parse_preload_options(['--zoom', '1', '--zoom', '2'])
  321. self.assertEqual(acc, {'zoom': ['1', '2']})