test_base.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. from __future__ import absolute_import
  2. import os
  3. from celery.bin.base import (
  4. Command,
  5. Option,
  6. Extensions,
  7. HelpFormatter,
  8. )
  9. from celery.tests.case import (
  10. AppCase, Mock, depends_on_current_app, override_stdouts, patch,
  11. )
  12. class Object(object):
  13. pass
  14. class MyApp(object):
  15. user_options = {'preload': None}
  16. APP = MyApp() # <-- Used by test_with_custom_app
  17. class MockCommand(Command):
  18. mock_args = ('arg1', 'arg2', 'arg3')
  19. def parse_options(self, prog_name, arguments, command=None):
  20. options = Object()
  21. options.foo = 'bar'
  22. options.prog_name = prog_name
  23. return options, self.mock_args
  24. def run(self, *args, **kwargs):
  25. return args, kwargs
  26. class test_Extensions(AppCase):
  27. def test_load(self):
  28. with patch('pkg_resources.iter_entry_points') as iterep:
  29. with patch('celery.bin.base.symbol_by_name') as symbyname:
  30. ep = Mock()
  31. ep.name = 'ep'
  32. ep.module_name = 'foo'
  33. ep.attrs = ['bar', 'baz']
  34. iterep.return_value = [ep]
  35. cls = symbyname.return_value = Mock()
  36. register = Mock()
  37. e = Extensions('unit', register)
  38. e.load()
  39. symbyname.assert_called_with('foo:bar')
  40. register.assert_called_with(cls, name='ep')
  41. with patch('celery.bin.base.symbol_by_name') as symbyname:
  42. symbyname.side_effect = SyntaxError()
  43. with patch('warnings.warn') as warn:
  44. e.load()
  45. self.assertTrue(warn.called)
  46. with patch('celery.bin.base.symbol_by_name') as symbyname:
  47. symbyname.side_effect = KeyError('foo')
  48. with self.assertRaises(KeyError):
  49. e.load()
  50. class test_HelpFormatter(AppCase):
  51. def test_format_epilog(self):
  52. f = HelpFormatter()
  53. self.assertTrue(f.format_epilog('hello'))
  54. self.assertFalse(f.format_epilog(''))
  55. def test_format_description(self):
  56. f = HelpFormatter()
  57. self.assertTrue(f.format_description('hello'))
  58. class test_Command(AppCase):
  59. def test_get_options(self):
  60. cmd = Command()
  61. cmd.option_list = (1, 2, 3)
  62. self.assertTupleEqual(cmd.get_options(), (1, 2, 3))
  63. def test_custom_description(self):
  64. class C(Command):
  65. description = 'foo'
  66. c = C()
  67. self.assertEqual(c.description, 'foo')
  68. def test_register_callbacks(self):
  69. c = Command(on_error=8, on_usage_error=9)
  70. self.assertEqual(c.on_error, 8)
  71. self.assertEqual(c.on_usage_error, 9)
  72. def test_run_raises_UsageError(self):
  73. cb = Mock()
  74. c = Command(on_usage_error=cb)
  75. c.verify_args = Mock()
  76. c.run = Mock()
  77. exc = c.run.side_effect = c.UsageError('foo', status=3)
  78. self.assertEqual(c(), exc.status)
  79. cb.assert_called_with(exc)
  80. c.verify_args.assert_called_with(())
  81. def test_default_on_usage_error(self):
  82. cmd = Command()
  83. cmd.handle_error = Mock()
  84. exc = Exception()
  85. cmd.on_usage_error(exc)
  86. cmd.handle_error.assert_called_with(exc)
  87. def test_verify_args_missing(self):
  88. c = Command()
  89. def run(a, b, c):
  90. pass
  91. c.run = run
  92. with self.assertRaises(c.UsageError):
  93. c.verify_args((1,))
  94. c.verify_args((1, 2, 3))
  95. def test_run_interface(self):
  96. with self.assertRaises(NotImplementedError):
  97. Command().run()
  98. @patch('sys.stdout')
  99. def test_early_version(self, stdout):
  100. cmd = Command()
  101. with self.assertRaises(SystemExit):
  102. cmd.early_version(['--version'])
  103. def test_execute_from_commandline(self):
  104. cmd = MockCommand(app=self.app)
  105. args1, kwargs1 = cmd.execute_from_commandline() # sys.argv
  106. self.assertTupleEqual(args1, cmd.mock_args)
  107. self.assertDictContainsSubset({'foo': 'bar'}, kwargs1)
  108. self.assertTrue(kwargs1.get('prog_name'))
  109. args2, kwargs2 = cmd.execute_from_commandline(['foo']) # pass list
  110. self.assertTupleEqual(args2, cmd.mock_args)
  111. self.assertDictContainsSubset({'foo': 'bar', 'prog_name': 'foo'},
  112. kwargs2)
  113. def test_with_bogus_args(self):
  114. with override_stdouts() as (_, stderr):
  115. cmd = MockCommand(app=self.app)
  116. cmd.supports_args = False
  117. with self.assertRaises(SystemExit):
  118. cmd.execute_from_commandline(argv=['--bogus'])
  119. self.assertTrue(stderr.getvalue())
  120. self.assertIn('Unrecognized', stderr.getvalue())
  121. def test_with_custom_config_module(self):
  122. prev = os.environ.pop('CELERY_CONFIG_MODULE', None)
  123. try:
  124. cmd = MockCommand(app=self.app)
  125. cmd.setup_app_from_commandline(['--config=foo.bar.baz'])
  126. self.assertEqual(os.environ.get('CELERY_CONFIG_MODULE'),
  127. 'foo.bar.baz')
  128. finally:
  129. if prev:
  130. os.environ['CELERY_CONFIG_MODULE'] = prev
  131. else:
  132. os.environ.pop('CELERY_CONFIG_MODULE', None)
  133. def test_with_custom_broker(self):
  134. prev = os.environ.pop('CELERY_BROKER_URL', None)
  135. try:
  136. cmd = MockCommand(app=self.app)
  137. cmd.setup_app_from_commandline(['--broker=xyzza://'])
  138. self.assertEqual(
  139. os.environ.get('CELERY_BROKER_URL'), 'xyzza://',
  140. )
  141. finally:
  142. if prev:
  143. os.environ['CELERY_BROKER_URL'] = prev
  144. else:
  145. os.environ.pop('CELERY_BROKER_URL', None)
  146. def test_with_custom_app(self):
  147. cmd = MockCommand(app=self.app)
  148. app = '.'.join([__name__, 'APP'])
  149. cmd.setup_app_from_commandline(['--app=%s' % (app,),
  150. '--loglevel=INFO'])
  151. self.assertIs(cmd.app, APP)
  152. cmd.setup_app_from_commandline(['-A', app,
  153. '--loglevel=INFO'])
  154. self.assertIs(cmd.app, APP)
  155. def test_setup_app_sets_quiet(self):
  156. cmd = MockCommand(app=self.app)
  157. cmd.setup_app_from_commandline(['-q'])
  158. self.assertTrue(cmd.quiet)
  159. cmd2 = MockCommand(app=self.app)
  160. cmd2.setup_app_from_commandline(['--quiet'])
  161. self.assertTrue(cmd2.quiet)
  162. def test_setup_app_sets_chdir(self):
  163. with patch('os.chdir') as chdir:
  164. cmd = MockCommand(app=self.app)
  165. cmd.setup_app_from_commandline(['--workdir=/opt'])
  166. chdir.assert_called_with('/opt')
  167. def test_setup_app_sets_loader(self):
  168. prev = os.environ.get('CELERY_LOADER')
  169. try:
  170. cmd = MockCommand(app=self.app)
  171. cmd.setup_app_from_commandline(['--loader=X.Y:Z'])
  172. self.assertEqual(os.environ['CELERY_LOADER'], 'X.Y:Z')
  173. finally:
  174. if prev is not None:
  175. os.environ['CELERY_LOADER'] = prev
  176. def test_setup_app_no_respect(self):
  177. cmd = MockCommand(app=self.app)
  178. cmd.respects_app_option = False
  179. with patch('celery.bin.base.Celery') as cp:
  180. cmd.setup_app_from_commandline(['--app=x.y:z'])
  181. self.assertTrue(cp.called)
  182. def test_setup_app_custom_app(self):
  183. cmd = MockCommand(app=self.app)
  184. app = cmd.app = Mock()
  185. app.user_options = {'preload': None}
  186. cmd.setup_app_from_commandline([])
  187. self.assertEqual(cmd.app, app)
  188. def test_find_app_suspects(self):
  189. cmd = MockCommand(app=self.app)
  190. self.assertTrue(cmd.find_app('celery.tests.bin.proj.app'))
  191. self.assertTrue(cmd.find_app('celery.tests.bin.proj'))
  192. self.assertTrue(cmd.find_app('celery.tests.bin.proj:hello'))
  193. self.assertTrue(cmd.find_app('celery.tests.bin.proj.app:app'))
  194. with self.assertRaises(AttributeError):
  195. cmd.find_app(__name__)
  196. def test_host_format(self):
  197. cmd = MockCommand(app=self.app)
  198. with patch('socket.gethostname') as hn:
  199. hn.return_value = 'blacktron.example.com'
  200. self.assertEqual(cmd.host_format(''), '')
  201. self.assertEqual(
  202. cmd.host_format('celery@%h'),
  203. 'celery@blacktron.example.com',
  204. )
  205. self.assertEqual(
  206. cmd.host_format('celery@%d'),
  207. 'celery@example.com',
  208. )
  209. self.assertEqual(
  210. cmd.host_format('celery@%n'),
  211. 'celery@blacktron',
  212. )
  213. def test_say_chat_quiet(self):
  214. cmd = MockCommand(app=self.app)
  215. cmd.quiet = True
  216. self.assertIsNone(cmd.say_chat('<-', 'foo', 'foo'))
  217. def test_say_chat_show_body(self):
  218. cmd = MockCommand(app=self.app)
  219. cmd.out = Mock()
  220. cmd.show_body = True
  221. cmd.say_chat('->', 'foo', 'body')
  222. cmd.out.assert_called_with('body')
  223. def test_say_chat_no_body(self):
  224. cmd = MockCommand(app=self.app)
  225. cmd.out = Mock()
  226. cmd.show_body = False
  227. cmd.say_chat('->', 'foo', 'body')
  228. @depends_on_current_app
  229. def test_with_cmdline_config(self):
  230. cmd = MockCommand(app=self.app)
  231. cmd.enable_config_from_cmdline = True
  232. cmd.namespace = 'celeryd'
  233. rest = cmd.setup_app_from_commandline(argv=[
  234. '--loglevel=INFO', '--',
  235. 'broker.url=amqp://broker.example.com',
  236. '.prefetch_multiplier=100'])
  237. self.assertEqual(cmd.app.conf.BROKER_URL,
  238. 'amqp://broker.example.com')
  239. self.assertEqual(cmd.app.conf.CELERYD_PREFETCH_MULTIPLIER, 100)
  240. self.assertListEqual(rest, ['--loglevel=INFO'])
  241. def test_find_app(self):
  242. cmd = MockCommand(app=self.app)
  243. with patch('celery.bin.base.symbol_by_name') as sbn:
  244. from types import ModuleType
  245. x = ModuleType('proj')
  246. def on_sbn(*args, **kwargs):
  247. def after(*args, **kwargs):
  248. x.app = 'quick brown fox'
  249. x.__path__ = None
  250. return x
  251. sbn.side_effect = after
  252. return x
  253. sbn.side_effect = on_sbn
  254. x.__path__ = [True]
  255. self.assertEqual(cmd.find_app('proj'), 'quick brown fox')
  256. def test_parse_preload_options_shortopt(self):
  257. cmd = Command()
  258. cmd.preload_options = (Option('-s', action='store', dest='silent'),)
  259. acc = cmd.parse_preload_options(['-s', 'yes'])
  260. self.assertEqual(acc.get('silent'), 'yes')
  261. def test_parse_preload_options_with_equals_and_append(self):
  262. cmd = Command()
  263. opt = Option('--zoom', action='append', default=[])
  264. cmd.preload_options = (opt,)
  265. acc = cmd.parse_preload_options(['--zoom=1', '--zoom=2'])
  266. self.assertEqual(acc, {'zoom': ['1', '2']})
  267. def test_parse_preload_options_without_equals_and_append(self):
  268. cmd = Command()
  269. opt = Option('--zoom', action='append', default=[])
  270. cmd.preload_options = (opt,)
  271. acc = cmd.parse_preload_options(['--zoom', '1', '--zoom', '2'])
  272. self.assertEqual(acc, {'zoom': ['1', '2']})