test_celery.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import sys
  4. from datetime import datetime
  5. from case import Mock, patch
  6. from kombu.utils.json import dumps
  7. from celery import __main__
  8. from celery.bin.base import Error
  9. from celery.bin import celery as mod
  10. from celery.bin.celery import (
  11. Command,
  12. list_,
  13. call,
  14. purge,
  15. result,
  16. inspect,
  17. control,
  18. status,
  19. migrate,
  20. help,
  21. report,
  22. CeleryCommand,
  23. determine_exit_status,
  24. multi,
  25. main as mainfun,
  26. _RemoteControl,
  27. command,
  28. )
  29. from celery.five import WhateverIO
  30. from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
  31. class test__main__:
  32. def test_main(self):
  33. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  34. with patch('celery.bin.celery.main') as main:
  35. __main__.main()
  36. mpc.assert_called_with()
  37. main.assert_called_with()
  38. def test_main__multi(self):
  39. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  40. with patch('celery.bin.celery.main') as main:
  41. prev, sys.argv = sys.argv, ['foo', 'multi']
  42. try:
  43. __main__.main()
  44. mpc.assert_not_called()
  45. main.assert_called_with()
  46. finally:
  47. sys.argv = prev
  48. class test_Command:
  49. def test_Error_repr(self):
  50. x = Error('something happened')
  51. assert x.status is not None
  52. assert x.reason
  53. assert str(x)
  54. def setup(self):
  55. self.out = WhateverIO()
  56. self.err = WhateverIO()
  57. self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
  58. def test_error(self):
  59. self.cmd.out = Mock()
  60. self.cmd.error('FOO')
  61. self.cmd.out.assert_called()
  62. def test_out(self):
  63. f = Mock()
  64. self.cmd.out('foo', f)
  65. def test_call(self):
  66. def ok_run():
  67. pass
  68. self.cmd.run = ok_run
  69. assert self.cmd() == EX_OK
  70. def error_run():
  71. raise Error('error', EX_FAILURE)
  72. self.cmd.run = error_run
  73. assert self.cmd() == EX_FAILURE
  74. def test_run_from_argv(self):
  75. with pytest.raises(NotImplementedError):
  76. self.cmd.run_from_argv('prog', ['foo', 'bar'])
  77. def test_pretty_list(self):
  78. assert self.cmd.pretty([])[1] == '- empty -'
  79. assert 'bar', self.cmd.pretty(['foo' in 'bar'][1])
  80. def test_pretty_dict(self, text='the quick brown fox'):
  81. assert 'OK' in str(self.cmd.pretty({'ok': text})[0])
  82. assert 'ERROR' in str(self.cmd.pretty({'error': text})[0])
  83. def test_pretty(self):
  84. assert 'OK' in str(self.cmd.pretty('the quick brown'))
  85. assert 'OK' in str(self.cmd.pretty(object()))
  86. assert 'OK' in str(self.cmd.pretty({'foo': 'bar'}))
  87. class test_list:
  88. def test_list_bindings_no_support(self):
  89. l = list_(app=self.app, stderr=WhateverIO())
  90. management = Mock()
  91. management.get_bindings.side_effect = NotImplementedError()
  92. with pytest.raises(Error):
  93. l.list_bindings(management)
  94. def test_run(self):
  95. l = list_(app=self.app, stderr=WhateverIO())
  96. l.run('bindings')
  97. with pytest.raises(Error):
  98. l.run(None)
  99. with pytest.raises(Error):
  100. l.run('foo')
  101. class test_call:
  102. def setup(self):
  103. @self.app.task(shared=False)
  104. def add(x, y):
  105. return x + y
  106. self.add = add
  107. @patch('celery.app.base.Celery.send_task')
  108. def test_run(self, send_task):
  109. a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
  110. a.run(self.add.name)
  111. send_task.assert_called()
  112. a.run(self.add.name,
  113. args=dumps([4, 4]),
  114. kwargs=dumps({'x': 2, 'y': 2}))
  115. assert send_task.call_args[1]['args'], [4 == 4]
  116. assert send_task.call_args[1]['kwargs'] == {'x': 2, 'y': 2}
  117. a.run(self.add.name, expires=10, countdown=10)
  118. assert send_task.call_args[1]['expires'] == 10
  119. assert send_task.call_args[1]['countdown'] == 10
  120. now = datetime.now()
  121. iso = now.isoformat()
  122. a.run(self.add.name, expires=iso)
  123. assert send_task.call_args[1]['expires'] == now
  124. with pytest.raises(ValueError):
  125. a.run(self.add.name, expires='foobaribazibar')
  126. class test_purge:
  127. def test_run(self):
  128. out = WhateverIO()
  129. a = purge(app=self.app, stdout=out)
  130. a._purge = Mock(name='_purge')
  131. a._purge.return_value = 0
  132. a.run(force=True)
  133. assert 'No messages purged' in out.getvalue()
  134. a._purge.return_value = 100
  135. a.run(force=True)
  136. assert '100 messages' in out.getvalue()
  137. a.out = Mock(name='out')
  138. a.ask = Mock(name='ask')
  139. a.run(force=False)
  140. a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')
  141. a.ask.return_value = 'yes'
  142. a.run(force=False)
  143. class test_result:
  144. def setup(self):
  145. @self.app.task(shared=False)
  146. def add(x, y):
  147. return x + y
  148. self.add = add
  149. def test_run(self):
  150. with patch('celery.result.AsyncResult.get') as get:
  151. out = WhateverIO()
  152. r = result(app=self.app, stdout=out)
  153. get.return_value = 'Jerry'
  154. r.run('id')
  155. assert 'Jerry' in out.getvalue()
  156. get.return_value = 'Elaine'
  157. r.run('id', task=self.add.name)
  158. assert 'Elaine' in out.getvalue()
  159. with patch('celery.result.AsyncResult.traceback') as tb:
  160. r.run('id', task=self.add.name, traceback=True)
  161. assert str(tb) in out.getvalue()
  162. class test_status:
  163. @patch('celery.bin.celery.inspect')
  164. def test_run(self, inspect_):
  165. out, err = WhateverIO(), WhateverIO()
  166. ins = inspect_.return_value = Mock()
  167. ins.run.return_value = []
  168. s = status(self.app, stdout=out, stderr=err)
  169. with pytest.raises(Error):
  170. s.run()
  171. ins.run.return_value = ['a', 'b', 'c']
  172. s.run()
  173. assert '3 nodes online' in out.getvalue()
  174. s.run(quiet=True)
  175. class test_migrate:
  176. @patch('celery.contrib.migrate.migrate_tasks')
  177. def test_run(self, migrate_tasks):
  178. out = WhateverIO()
  179. m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
  180. with pytest.raises(TypeError):
  181. m.run()
  182. migrate_tasks.assert_not_called()
  183. m.run('memory://foo', 'memory://bar')
  184. migrate_tasks.assert_called()
  185. state = Mock()
  186. state.count = 10
  187. state.strtotal = 30
  188. m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
  189. assert '10/30' in out.getvalue()
  190. class test_report:
  191. def test_run(self):
  192. out = WhateverIO()
  193. r = report(app=self.app, stdout=out)
  194. assert r.run() == EX_OK
  195. assert out.getvalue()
  196. class test_help:
  197. def test_run(self):
  198. out = WhateverIO()
  199. h = help(app=self.app, stdout=out)
  200. h.parser = Mock()
  201. assert h.run() == EX_USAGE
  202. assert out.getvalue()
  203. assert h.usage('help')
  204. h.parser.print_help.assert_called_with()
  205. class test_CeleryCommand:
  206. def test_execute_from_commandline(self):
  207. x = CeleryCommand(app=self.app)
  208. x.handle_argv = Mock()
  209. x.handle_argv.return_value = 1
  210. with pytest.raises(SystemExit):
  211. x.execute_from_commandline()
  212. x.handle_argv.return_value = True
  213. with pytest.raises(SystemExit):
  214. x.execute_from_commandline()
  215. x.handle_argv.side_effect = KeyboardInterrupt()
  216. with pytest.raises(SystemExit):
  217. x.execute_from_commandline()
  218. x.respects_app_option = True
  219. with pytest.raises(SystemExit):
  220. x.execute_from_commandline(['celery', 'multi'])
  221. assert not x.respects_app_option
  222. x.respects_app_option = True
  223. with pytest.raises(SystemExit):
  224. x.execute_from_commandline(['manage.py', 'celery', 'multi'])
  225. assert not x.respects_app_option
  226. def test_with_pool_option(self):
  227. x = CeleryCommand(app=self.app)
  228. assert x.with_pool_option(['celery', 'events']) is None
  229. assert x.with_pool_option(['celery', 'worker'])
  230. assert x.with_pool_option(['manage.py', 'celery', 'worker'])
  231. def test_load_extensions_no_commands(self):
  232. with patch('celery.bin.celery.Extensions') as Ext:
  233. ext = Ext.return_value = Mock(name='Extension')
  234. ext.load.return_value = None
  235. x = CeleryCommand(app=self.app)
  236. x.load_extension_commands()
  237. def test_load_extensions_commands(self):
  238. with patch('celery.bin.celery.Extensions') as Ext:
  239. prev, mod.command_classes = list(mod.command_classes), Mock()
  240. try:
  241. ext = Ext.return_value = Mock(name='Extension')
  242. ext.load.return_value = ['foo', 'bar']
  243. x = CeleryCommand(app=self.app)
  244. x.load_extension_commands()
  245. mod.command_classes.append.assert_called_with(
  246. ('Extensions', ['foo', 'bar'], 'magenta'),
  247. )
  248. finally:
  249. mod.command_classes = prev
  250. def test_determine_exit_status(self):
  251. assert determine_exit_status('true') == EX_OK
  252. assert determine_exit_status('') == EX_FAILURE
  253. def test_relocate_args_from_start(self):
  254. x = CeleryCommand(app=self.app)
  255. assert x._relocate_args_from_start(None) == []
  256. relargs1 = x._relocate_args_from_start([
  257. '-l', 'debug', 'worker', '-c', '3', '--foo',
  258. ])
  259. assert relargs1 == ['worker', '-c', '3', '--foo', '-l', 'debug']
  260. relargs2 = x._relocate_args_from_start([
  261. '--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3',
  262. ])
  263. assert relargs2 == [
  264. 'worker', '--foo', '-c', '3',
  265. '--pool=gevent', '-l', 'debug',
  266. ]
  267. assert x._relocate_args_from_start(['foo', '--foo=1']) == [
  268. 'foo', '--foo=1',
  269. ]
  270. def test_register_command(self):
  271. prev, CeleryCommand.commands = dict(CeleryCommand.commands), {}
  272. try:
  273. fun = Mock(name='fun')
  274. CeleryCommand.register_command(fun, name='foo')
  275. assert CeleryCommand.commands['foo'] is fun
  276. finally:
  277. CeleryCommand.commands = prev
  278. def test_handle_argv(self):
  279. x = CeleryCommand(app=self.app)
  280. x.execute = Mock()
  281. x.handle_argv('celery', [])
  282. x.execute.assert_called_with('help', ['help'])
  283. x.handle_argv('celery', ['start', 'foo'])
  284. x.execute.assert_called_with('start', ['start', 'foo'])
  285. def test_execute(self):
  286. x = CeleryCommand(app=self.app)
  287. Help = x.commands['help'] = Mock()
  288. help = Help.return_value = Mock()
  289. x.execute('fooox', ['a'])
  290. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  291. help.reset()
  292. x.execute('help', ['help'])
  293. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  294. Dummy = x.commands['dummy'] = Mock()
  295. dummy = Dummy.return_value = Mock()
  296. exc = dummy.run_from_argv.side_effect = Error(
  297. 'foo', status='EX_FAILURE',
  298. )
  299. x.on_error = Mock(name='on_error')
  300. help.reset()
  301. x.execute('dummy', ['dummy'])
  302. x.on_error.assert_called_with(exc)
  303. dummy.run_from_argv.assert_called_with(
  304. x.prog_name, [], command='dummy',
  305. )
  306. help.run_from_argv.assert_called_with(
  307. x.prog_name, [], command='help',
  308. )
  309. exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
  310. x.on_usage_error = Mock()
  311. x.execute('dummy', ['dummy'])
  312. x.on_usage_error.assert_called_with(exc)
  313. def test_on_usage_error(self):
  314. x = CeleryCommand(app=self.app)
  315. x.error = Mock()
  316. x.on_usage_error(x.UsageError('foo'), command=None)
  317. x.error.assert_called()
  318. x.on_usage_error(x.UsageError('foo'), command='dummy')
  319. def test_prepare_prog_name(self):
  320. x = CeleryCommand(app=self.app)
  321. main = Mock(name='__main__')
  322. main.__file__ = '/opt/foo.py'
  323. with patch.dict(sys.modules, __main__=main):
  324. assert x.prepare_prog_name('__main__.py') == '/opt/foo.py'
  325. assert x.prepare_prog_name('celery') == 'celery'
  326. class test_RemoteControl:
  327. def test_call_interface(self):
  328. with pytest.raises(NotImplementedError):
  329. _RemoteControl(app=self.app).call()
  330. class test_inspect:
  331. def test_usage(self):
  332. assert inspect(app=self.app).usage('foo')
  333. def test_command_info(self):
  334. i = inspect(app=self.app)
  335. assert i.get_command_info(
  336. 'ping', help=True, color=i.colored.red, app=self.app,
  337. )
  338. def test_list_commands_color(self):
  339. i = inspect(app=self.app)
  340. assert i.list_commands(help=True, color=i.colored.red, app=self.app)
  341. assert i.list_commands(help=False, color=None, app=self.app)
  342. def test_epilog(self):
  343. assert inspect(app=self.app).epilog
  344. def test_do_call_method_sql_transport_type(self):
  345. self.app.connection = Mock()
  346. conn = self.app.connection.return_value = Mock(name='Connection')
  347. conn.transport.driver_type = 'sql'
  348. i = inspect(app=self.app)
  349. with pytest.raises(i.Error):
  350. i.do_call_method(['ping'])
  351. def test_say_directions(self):
  352. i = inspect(self.app)
  353. i.out = Mock()
  354. i.quiet = True
  355. i.say_chat('<-', 'hello out')
  356. i.out.assert_not_called()
  357. i.say_chat('->', 'hello in')
  358. i.out.assert_called()
  359. i.quiet = False
  360. i.out.reset_mock()
  361. i.say_chat('<-', 'hello out', 'body')
  362. i.out.assert_called()
  363. @patch('celery.app.control.Control.inspect')
  364. def test_run(self, real):
  365. out = WhateverIO()
  366. i = inspect(app=self.app, stdout=out)
  367. with pytest.raises(Error):
  368. i.run()
  369. with pytest.raises(Error):
  370. i.run('help')
  371. with pytest.raises(Error):
  372. i.run('xyzzybaz')
  373. i.run('ping')
  374. real.assert_called()
  375. i.run('ping', destination='foo,bar')
  376. assert real.call_args[1]['destination'], ['foo' == 'bar']
  377. assert real.call_args[1]['timeout'] == 0.2
  378. callback = real.call_args[1]['callback']
  379. callback({'foo': {'ok': 'pong'}})
  380. assert 'OK' in out.getvalue()
  381. with patch('celery.bin.celery.dumps') as dumps:
  382. i.run('ping', json=True)
  383. dumps.assert_called()
  384. instance = real.return_value = Mock()
  385. instance._request.return_value = None
  386. with pytest.raises(Error):
  387. i.run('ping')
  388. out.seek(0)
  389. out.truncate()
  390. i.quiet = True
  391. i.say_chat('<-', 'hello')
  392. assert not out.getvalue()
  393. class test_control:
  394. def control(self, patch_call, *args, **kwargs):
  395. kwargs.setdefault('app', Mock(name='app'))
  396. c = control(*args, **kwargs)
  397. if patch_call:
  398. c.call = Mock(name='control.call')
  399. return c
  400. def test_call(self):
  401. i = self.control(False)
  402. i.call('foo', arguments={'kw': 2})
  403. i.app.control.broadcast.assert_called_with(
  404. 'foo', arguments={'kw': 2}, reply=True)
  405. class test_multi:
  406. def test_get_options(self):
  407. assert multi(app=self.app).get_options() is None
  408. def test_run_from_argv(self):
  409. with patch('celery.bin.multi.MultiTool') as MultiTool:
  410. m = MultiTool.return_value = Mock()
  411. multi(self.app).run_from_argv('celery', ['arg'], command='multi')
  412. m.execute_from_commandline.assert_called_with(
  413. ['multi', 'arg'], 'celery',
  414. )
  415. class test_main:
  416. @patch('celery.bin.celery.CeleryCommand')
  417. def test_main(self, Command):
  418. cmd = Command.return_value = Mock()
  419. mainfun()
  420. cmd.execute_from_commandline.assert_called_with(None)
  421. @patch('celery.bin.celery.CeleryCommand')
  422. def test_main_KeyboardInterrupt(self, Command):
  423. cmd = Command.return_value = Mock()
  424. cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
  425. mainfun()
  426. cmd.execute_from_commandline.assert_called_with(None)
  427. class test_compat:
  428. def test_compat_command_decorator(self):
  429. with patch('celery.bin.celery.CeleryCommand') as CC:
  430. assert command() == CC.register_command
  431. fun = Mock(name='fun')
  432. command(fun)
  433. CC.register_command.assert_called_with(fun)