test_base.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. import pytest
  4. from case import Mock, mock, patch
  5. from celery.bin.base import Command, Extensions, Option
  6. from celery.five import bytes_if_py2
  7. class MyApp(object):
  8. user_options = {'preload': None}
  9. APP = MyApp() # <-- Used by test_with_custom_app
  10. class MockCommand(Command):
  11. mock_args = ('arg1', 'arg2', 'arg3')
  12. def parse_options(self, prog_name, arguments, command=None):
  13. options = dict(foo='bar', prog_name=prog_name)
  14. return options, self.mock_args
  15. def run(self, *args, **kwargs):
  16. return args, kwargs
  17. class test_Extensions:
  18. def test_load(self):
  19. with patch('pkg_resources.iter_entry_points') as iterep:
  20. with patch('celery.utils.imports.symbol_by_name') as symbyname:
  21. ep = Mock()
  22. ep.name = 'ep'
  23. ep.module_name = 'foo'
  24. ep.attrs = ['bar', 'baz']
  25. iterep.return_value = [ep]
  26. cls = symbyname.return_value = Mock()
  27. register = Mock()
  28. e = Extensions('unit', register)
  29. e.load()
  30. symbyname.assert_called_with('foo:bar')
  31. register.assert_called_with(cls, name='ep')
  32. with patch('celery.utils.imports.symbol_by_name') as symbyname:
  33. symbyname.side_effect = SyntaxError()
  34. with patch('warnings.warn') as warn:
  35. e.load()
  36. warn.assert_called()
  37. with patch('celery.utils.imports.symbol_by_name') as symbyname:
  38. symbyname.side_effect = KeyError('foo')
  39. with pytest.raises(KeyError):
  40. e.load()
  41. class test_Command:
  42. def test_get_options(self):
  43. cmd = Command()
  44. cmd.option_list = (1, 2, 3)
  45. assert cmd.get_options() == (1, 2, 3)
  46. def test_custom_description(self):
  47. class C(Command):
  48. description = 'foo'
  49. c = C()
  50. assert c.description == 'foo'
  51. def test_format_epilog(self):
  52. assert Command()._format_epilog('hello')
  53. assert not Command()._format_epilog('')
  54. def test_format_description(self):
  55. assert Command()._format_description('hello')
  56. def test_register_callbacks(self):
  57. c = Command(on_error=8, on_usage_error=9)
  58. assert c.on_error == 8
  59. assert c.on_usage_error == 9
  60. def test_run_raises_UsageError(self):
  61. cb = Mock()
  62. c = Command(on_usage_error=cb)
  63. c.verify_args = Mock()
  64. c.run = Mock()
  65. exc = c.run.side_effect = c.UsageError('foo', status=3)
  66. assert c() == exc.status
  67. cb.assert_called_with(exc)
  68. c.verify_args.assert_called_with(())
  69. def test_default_on_usage_error(self):
  70. cmd = Command()
  71. cmd.handle_error = Mock()
  72. exc = Exception()
  73. cmd.on_usage_error(exc)
  74. cmd.handle_error.assert_called_with(exc)
  75. def test_verify_args_missing(self):
  76. c = Command()
  77. def run(a, b, c):
  78. pass
  79. c.run = run
  80. with pytest.raises(c.UsageError):
  81. c.verify_args((1,))
  82. c.verify_args((1, 2, 3))
  83. def test_run_interface(self):
  84. with pytest.raises(NotImplementedError):
  85. Command().run()
  86. @patch('sys.stdout')
  87. def test_early_version(self, stdout):
  88. cmd = Command()
  89. with pytest.raises(SystemExit):
  90. cmd.early_version(['--version'])
  91. def test_execute_from_commandline(self, app):
  92. cmd = MockCommand(app=app)
  93. args1, kwargs1 = cmd.execute_from_commandline() # sys.argv
  94. assert args1 == cmd.mock_args
  95. assert kwargs1['foo'] == 'bar'
  96. assert kwargs1.get('prog_name')
  97. args2, kwargs2 = cmd.execute_from_commandline(['foo']) # pass list
  98. assert args2 == cmd.mock_args
  99. assert kwargs2['foo'] == 'bar'
  100. assert kwargs2['prog_name'] == 'foo'
  101. def test_with_bogus_args(self, app):
  102. with mock.stdouts() as (_, stderr):
  103. cmd = MockCommand(app=app)
  104. cmd.supports_args = False
  105. with pytest.raises(SystemExit):
  106. cmd.execute_from_commandline(argv=['--bogus'])
  107. assert stderr.getvalue()
  108. assert 'Unrecognized' in stderr.getvalue()
  109. def test_with_custom_config_module(self, app):
  110. prev = os.environ.pop('CELERY_CONFIG_MODULE', None)
  111. try:
  112. cmd = MockCommand(app=app)
  113. cmd.setup_app_from_commandline(['--config=foo.bar.baz'])
  114. assert os.environ.get('CELERY_CONFIG_MODULE') == 'foo.bar.baz'
  115. finally:
  116. if prev:
  117. os.environ['CELERY_CONFIG_MODULE'] = prev
  118. else:
  119. os.environ.pop('CELERY_CONFIG_MODULE', None)
  120. def test_with_custom_broker(self, app):
  121. prev = os.environ.pop('CELERY_BROKER_URL', None)
  122. try:
  123. cmd = MockCommand(app=app)
  124. cmd.setup_app_from_commandline(['--broker=xyzza://'])
  125. assert os.environ.get('CELERY_BROKER_URL') == 'xyzza://'
  126. finally:
  127. if prev:
  128. os.environ['CELERY_BROKER_URL'] = prev
  129. else:
  130. os.environ.pop('CELERY_BROKER_URL', None)
  131. def test_with_custom_app(self, app):
  132. cmd = MockCommand(app=app)
  133. appstr = '.'.join([__name__, 'APP'])
  134. cmd.setup_app_from_commandline(['--app=%s' % (appstr,),
  135. '--loglevel=INFO'])
  136. assert cmd.app is APP
  137. cmd.setup_app_from_commandline(['-A', appstr,
  138. '--loglevel=INFO'])
  139. assert cmd.app is APP
  140. def test_setup_app_sets_quiet(self, app):
  141. cmd = MockCommand(app=app)
  142. cmd.setup_app_from_commandline(['-q'])
  143. assert cmd.quiet
  144. cmd2 = MockCommand(app=app)
  145. cmd2.setup_app_from_commandline(['--quiet'])
  146. assert cmd2.quiet
  147. def test_setup_app_sets_chdir(self, app):
  148. with patch('os.chdir') as chdir:
  149. cmd = MockCommand(app=app)
  150. cmd.setup_app_from_commandline(['--workdir=/opt'])
  151. chdir.assert_called_with('/opt')
  152. def test_setup_app_sets_loader(self, app):
  153. prev = os.environ.get('CELERY_LOADER')
  154. try:
  155. cmd = MockCommand(app=app)
  156. cmd.setup_app_from_commandline(['--loader=X.Y:Z'])
  157. assert os.environ['CELERY_LOADER'] == 'X.Y:Z'
  158. finally:
  159. if prev is not None:
  160. os.environ['CELERY_LOADER'] = prev
  161. else:
  162. del(os.environ['CELERY_LOADER'])
  163. def test_setup_app_no_respect(self, app):
  164. cmd = MockCommand(app=app)
  165. cmd.respects_app_option = False
  166. with patch('celery.bin.base.Celery') as cp:
  167. cmd.setup_app_from_commandline(['--app=x.y:z'])
  168. cp.assert_called()
  169. def test_setup_app_custom_app(self, app):
  170. cmd = MockCommand(app=app)
  171. app = cmd.app = Mock()
  172. app.user_options = {'preload': None}
  173. cmd.setup_app_from_commandline([])
  174. assert cmd.app == app
  175. def test_find_app_suspects(self, app):
  176. cmd = MockCommand(app=app)
  177. assert cmd.find_app('t.unit.bin.proj.app')
  178. assert cmd.find_app('t.unit.bin.proj')
  179. assert cmd.find_app('t.unit.bin.proj:hello')
  180. assert cmd.find_app('t.unit.bin.proj.hello')
  181. assert cmd.find_app('t.unit.bin.proj.app:app')
  182. assert cmd.find_app('t.unit.bin.proj.app.app')
  183. with pytest.raises(AttributeError):
  184. cmd.find_app('t.unit.bin')
  185. with pytest.raises(AttributeError):
  186. cmd.find_app(__name__)
  187. def test_ask(self, app, patching):
  188. try:
  189. input = patching('celery.bin.base.input')
  190. except AttributeError:
  191. input = patching('builtins.input')
  192. cmd = MockCommand(app=app)
  193. input.return_value = 'yes'
  194. assert cmd.ask('q', ('yes', 'no'), 'no') == 'yes'
  195. input.return_value = 'nop'
  196. assert cmd.ask('q', ('yes', 'no'), 'no') == 'no'
  197. def test_host_format(self, app):
  198. cmd = MockCommand(app=app)
  199. with patch('celery.utils.nodenames.gethostname') as hn:
  200. hn.return_value = 'blacktron.example.com'
  201. assert cmd.host_format('') == ''
  202. assert (cmd.host_format('celery@%h') ==
  203. 'celery@blacktron.example.com')
  204. assert cmd.host_format('celery@%d') == 'celery@example.com'
  205. assert cmd.host_format('celery@%n') == 'celery@blacktron'
  206. def test_say_chat_quiet(self, app):
  207. cmd = MockCommand(app=app)
  208. cmd.quiet = True
  209. assert cmd.say_chat('<-', 'foo', 'foo') is None
  210. def test_say_chat_show_body(self, app):
  211. cmd = MockCommand(app=app)
  212. cmd.out = Mock()
  213. cmd.show_body = True
  214. cmd.say_chat('->', 'foo', 'body')
  215. cmd.out.assert_called_with('body')
  216. def test_say_chat_no_body(self, app):
  217. cmd = MockCommand(app=app)
  218. cmd.out = Mock()
  219. cmd.show_body = False
  220. cmd.say_chat('->', 'foo', 'body')
  221. @pytest.mark.usefixtures('depends_on_current_app')
  222. def test_with_cmdline_config(self, app):
  223. cmd = MockCommand(app=app)
  224. cmd.enable_config_from_cmdline = True
  225. cmd.namespace = 'worker'
  226. rest = cmd.setup_app_from_commandline(argv=[
  227. '--loglevel=INFO', '--',
  228. 'broker.url=amqp://broker.example.com',
  229. '.prefetch_multiplier=100'])
  230. assert cmd.app.conf.broker_url == 'amqp://broker.example.com'
  231. assert cmd.app.conf.worker_prefetch_multiplier == 100
  232. assert rest == ['--loglevel=INFO']
  233. cmd.app = None
  234. cmd.get_app = Mock(name='get_app')
  235. cmd.get_app.return_value = app
  236. app.user_options['preload'] = [
  237. Option('--foo', action='store_true'),
  238. ]
  239. cmd.setup_app_from_commandline(argv=[
  240. '--foo', '--loglevel=INFO', '--',
  241. 'broker.url=amqp://broker.example.com',
  242. '.prefetch_multiplier=100'])
  243. assert cmd.app is cmd.get_app()
  244. def test_get_default_app(self, app, patching):
  245. patching('celery._state.get_current_app')
  246. cmd = MockCommand(app=app)
  247. from celery._state import get_current_app
  248. assert cmd._get_default_app() is get_current_app()
  249. def test_set_colored(self, app):
  250. cmd = MockCommand(app=app)
  251. cmd.colored = 'foo'
  252. assert cmd.colored == 'foo'
  253. def test_set_no_color(self, app):
  254. cmd = MockCommand(app=app)
  255. cmd.no_color = False
  256. _ = cmd.colored # noqa
  257. cmd.no_color = True
  258. assert not cmd.colored.enabled
  259. def test_find_app(self, app):
  260. cmd = MockCommand(app=app)
  261. with patch('celery.utils.imports.symbol_by_name') as sbn:
  262. from types import ModuleType
  263. x = ModuleType(bytes_if_py2('proj'))
  264. def on_sbn(*args, **kwargs):
  265. def after(*args, **kwargs):
  266. x.app = 'quick brown fox'
  267. x.__path__ = None
  268. return x
  269. sbn.side_effect = after
  270. return x
  271. sbn.side_effect = on_sbn
  272. x.__path__ = [True]
  273. assert cmd.find_app('proj') == 'quick brown fox'
  274. def test_parse_preload_options_shortopt(self):
  275. class TestCommand(Command):
  276. def add_preload_arguments(self, parser):
  277. parser.add_argument('-s', action='store', dest='silent')
  278. cmd = TestCommand()
  279. acc = cmd.parse_preload_options(['-s', 'yes'])
  280. assert acc.get('silent') == 'yes'
  281. def test_parse_preload_options_with_equals_and_append(self):
  282. class TestCommand(Command):
  283. def add_preload_arguments(self, parser):
  284. parser.add_argument('--zoom', action='append', default=[])
  285. cmd = Command()
  286. acc = cmd.parse_preload_options(['--zoom=1', '--zoom=2'])
  287. assert acc, {'zoom': ['1' == '2']}
  288. def test_parse_preload_options_without_equals_and_append(self):
  289. cmd = Command()
  290. opt = Option('--zoom', action='append', default=[])
  291. cmd.preload_options = (opt,)
  292. acc = cmd.parse_preload_options(['--zoom', '1', '--zoom', '2'])
  293. assert acc, {'zoom': ['1' == '2']}