test_celery.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. import sys
  2. from datetime import datetime
  3. from kombu.utils.json import dumps
  4. from celery import __main__
  5. from celery.bin.base import Error
  6. from celery.bin import celery as mod
  7. from celery.bin.celery import (
  8. Command,
  9. list_,
  10. call,
  11. purge,
  12. result,
  13. inspect,
  14. control,
  15. status,
  16. migrate,
  17. help,
  18. report,
  19. CeleryCommand,
  20. determine_exit_status,
  21. multi,
  22. main as mainfun,
  23. _RemoteControl,
  24. command,
  25. )
  26. from celery.five import WhateverIO
  27. from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
  28. from celery.tests.case import AppCase, Mock, patch
  29. class test__main__(AppCase):
  30. def test_main(self):
  31. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  32. with patch('celery.bin.celery.main') as main:
  33. __main__.main()
  34. mpc.assert_called_with()
  35. main.assert_called_with()
  36. def test_main__multi(self):
  37. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  38. with patch('celery.bin.celery.main') as main:
  39. prev, sys.argv = sys.argv, ['foo', 'multi']
  40. try:
  41. __main__.main()
  42. mpc.assert_not_called()
  43. main.assert_called_with()
  44. finally:
  45. sys.argv = prev
  46. class test_Command(AppCase):
  47. def test_Error_repr(self):
  48. x = Error('something happened')
  49. self.assertIsNotNone(x.status)
  50. self.assertTrue(x.reason)
  51. self.assertTrue(str(x))
  52. def setup(self):
  53. self.out = WhateverIO()
  54. self.err = WhateverIO()
  55. self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
  56. def test_error(self):
  57. self.cmd.out = Mock()
  58. self.cmd.error('FOO')
  59. self.cmd.out.assert_called()
  60. def test_out(self):
  61. f = Mock()
  62. self.cmd.out('foo', f)
  63. def test_call(self):
  64. def ok_run():
  65. pass
  66. self.cmd.run = ok_run
  67. self.assertEqual(self.cmd(), EX_OK)
  68. def error_run():
  69. raise Error('error', EX_FAILURE)
  70. self.cmd.run = error_run
  71. self.assertEqual(self.cmd(), EX_FAILURE)
  72. def test_run_from_argv(self):
  73. with self.assertRaises(NotImplementedError):
  74. self.cmd.run_from_argv('prog', ['foo', 'bar'])
  75. def test_pretty_list(self):
  76. self.assertEqual(self.cmd.pretty([])[1], '- empty -')
  77. self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])
  78. def test_pretty_dict(self):
  79. self.assertIn(
  80. 'OK',
  81. str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]),
  82. )
  83. self.assertIn(
  84. 'ERROR',
  85. str(self.cmd.pretty({'error': 'the quick brown fox'})[0]),
  86. )
  87. def test_pretty(self):
  88. self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
  89. self.assertIn('OK', str(self.cmd.pretty(object())))
  90. self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))
  91. class test_list(AppCase):
  92. def test_list_bindings_no_support(self):
  93. l = list_(app=self.app, stderr=WhateverIO())
  94. management = Mock()
  95. management.get_bindings.side_effect = NotImplementedError()
  96. with self.assertRaises(Error):
  97. l.list_bindings(management)
  98. def test_run(self):
  99. l = list_(app=self.app, stderr=WhateverIO())
  100. l.run('bindings')
  101. with self.assertRaises(Error):
  102. l.run(None)
  103. with self.assertRaises(Error):
  104. l.run('foo')
  105. class test_call(AppCase):
  106. def setup(self):
  107. @self.app.task(shared=False)
  108. def add(x, y):
  109. return x + y
  110. self.add = add
  111. @patch('celery.app.base.Celery.send_task')
  112. def test_run(self, send_task):
  113. a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
  114. a.run(self.add.name)
  115. send_task.assert_called()
  116. a.run(self.add.name,
  117. args=dumps([4, 4]),
  118. kwargs=dumps({'x': 2, 'y': 2}))
  119. self.assertEqual(send_task.call_args[1]['args'], [4, 4])
  120. self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})
  121. a.run(self.add.name, expires=10, countdown=10)
  122. self.assertEqual(send_task.call_args[1]['expires'], 10)
  123. self.assertEqual(send_task.call_args[1]['countdown'], 10)
  124. now = datetime.now()
  125. iso = now.isoformat()
  126. a.run(self.add.name, expires=iso)
  127. self.assertEqual(send_task.call_args[1]['expires'], now)
  128. with self.assertRaises(ValueError):
  129. a.run(self.add.name, expires='foobaribazibar')
  130. class test_purge(AppCase):
  131. def test_run(self):
  132. out = WhateverIO()
  133. a = purge(app=self.app, stdout=out)
  134. a._purge = Mock(name='_purge')
  135. a._purge.return_value = 0
  136. a.run(force=True)
  137. self.assertIn('No messages purged', out.getvalue())
  138. a._purge.return_value = 100
  139. a.run(force=True)
  140. self.assertIn('100 messages', out.getvalue())
  141. a.out = Mock(name='out')
  142. a.ask = Mock(name='ask')
  143. a.run(force=False)
  144. a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')
  145. a.ask.return_value = 'yes'
  146. a.run(force=False)
  147. class test_result(AppCase):
  148. def setup(self):
  149. @self.app.task(shared=False)
  150. def add(x, y):
  151. return x + y
  152. self.add = add
  153. def test_run(self):
  154. with patch('celery.result.AsyncResult.get') as get:
  155. out = WhateverIO()
  156. r = result(app=self.app, stdout=out)
  157. get.return_value = 'Jerry'
  158. r.run('id')
  159. self.assertIn('Jerry', out.getvalue())
  160. get.return_value = 'Elaine'
  161. r.run('id', task=self.add.name)
  162. self.assertIn('Elaine', out.getvalue())
  163. with patch('celery.result.AsyncResult.traceback') as tb:
  164. r.run('id', task=self.add.name, traceback=True)
  165. self.assertIn(str(tb), out.getvalue())
  166. class test_status(AppCase):
  167. @patch('celery.bin.celery.inspect')
  168. def test_run(self, inspect_):
  169. out, err = WhateverIO(), WhateverIO()
  170. ins = inspect_.return_value = Mock()
  171. ins.run.return_value = []
  172. s = status(self.app, stdout=out, stderr=err)
  173. with self.assertRaises(Error):
  174. s.run()
  175. ins.run.return_value = ['a', 'b', 'c']
  176. s.run()
  177. self.assertIn('3 nodes online', out.getvalue())
  178. s.run(quiet=True)
  179. class test_migrate(AppCase):
  180. @patch('celery.contrib.migrate.migrate_tasks')
  181. def test_run(self, migrate_tasks):
  182. out = WhateverIO()
  183. m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
  184. with self.assertRaises(TypeError):
  185. m.run()
  186. migrate_tasks.assert_not_called()
  187. m.run('memory://foo', 'memory://bar')
  188. migrate_tasks.assert_called()
  189. state = Mock()
  190. state.count = 10
  191. state.strtotal = 30
  192. m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
  193. self.assertIn('10/30', out.getvalue())
  194. class test_report(AppCase):
  195. def test_run(self):
  196. out = WhateverIO()
  197. r = report(app=self.app, stdout=out)
  198. self.assertEqual(r.run(), EX_OK)
  199. self.assertTrue(out.getvalue())
  200. class test_help(AppCase):
  201. def test_run(self):
  202. out = WhateverIO()
  203. h = help(app=self.app, stdout=out)
  204. h.parser = Mock()
  205. self.assertEqual(h.run(), EX_USAGE)
  206. self.assertTrue(out.getvalue())
  207. self.assertTrue(h.usage('help'))
  208. h.parser.print_help.assert_called_with()
  209. class test_CeleryCommand(AppCase):
  210. def test_execute_from_commandline(self):
  211. x = CeleryCommand(app=self.app)
  212. x.handle_argv = Mock()
  213. x.handle_argv.return_value = 1
  214. with self.assertRaises(SystemExit):
  215. x.execute_from_commandline()
  216. x.handle_argv.return_value = True
  217. with self.assertRaises(SystemExit):
  218. x.execute_from_commandline()
  219. x.handle_argv.side_effect = KeyboardInterrupt()
  220. with self.assertRaises(SystemExit):
  221. x.execute_from_commandline()
  222. x.requires_app = True
  223. x.fake_app = False
  224. with self.assertRaises(SystemExit):
  225. x.execute_from_commandline(['celery', 'multi'])
  226. self.assertFalse(x.requires_app)
  227. self.assertTrue(x.fake_app)
  228. x.requires_app = True
  229. x.fake_app = False
  230. with self.assertRaises(SystemExit):
  231. x.execute_from_commandline(['manage.py', 'celery', 'multi'])
  232. self.assertFalse(x.requires_app)
  233. self.assertTrue(x.fake_app)
  234. def test_with_pool_option(self):
  235. x = CeleryCommand(app=self.app)
  236. self.assertIsNone(x.with_pool_option(['celery', 'events']))
  237. self.assertTrue(x.with_pool_option(['celery', 'worker']))
  238. self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))
  239. def test_load_extensions_no_commands(self):
  240. with patch('celery.bin.celery.Extensions') as Ext:
  241. ext = Ext.return_value = Mock(name='Extension')
  242. ext.load.return_value = None
  243. x = CeleryCommand(app=self.app)
  244. x.load_extension_commands()
  245. def test_load_extensions_commands(self):
  246. with patch('celery.bin.celery.Extensions') as Ext:
  247. prev, mod.command_classes = list(mod.command_classes), Mock()
  248. try:
  249. ext = Ext.return_value = Mock(name='Extension')
  250. ext.load.return_value = ['foo', 'bar']
  251. x = CeleryCommand(app=self.app)
  252. x.load_extension_commands()
  253. mod.command_classes.append.assert_called_with(
  254. ('Extensions', ['foo', 'bar'], 'magenta'),
  255. )
  256. finally:
  257. mod.command_classes = prev
  258. def test_determine_exit_status(self):
  259. self.assertEqual(determine_exit_status('true'), EX_OK)
  260. self.assertEqual(determine_exit_status(''), EX_FAILURE)
  261. def test_relocate_args_from_start(self):
  262. x = CeleryCommand(app=self.app)
  263. self.assertEqual(x._relocate_args_from_start(None), [])
  264. self.assertEqual(
  265. x._relocate_args_from_start(
  266. ['-l', 'debug', 'worker', '-c', '3', '--foo'],
  267. ),
  268. ['worker', '-c', '3', '--foo', '-l', 'debug'],
  269. )
  270. self.assertEqual(
  271. x._relocate_args_from_start(
  272. ['--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3'],
  273. ),
  274. ['worker', '--foo', '-c', '3', '--pool=gevent', '-l', 'debug'],
  275. )
  276. self.assertEqual(
  277. x._relocate_args_from_start(['foo', '--foo=1']),
  278. ['foo', '--foo=1'],
  279. )
  280. def test_register_command(self):
  281. prev, CeleryCommand.commands = dict(CeleryCommand.commands), {}
  282. try:
  283. fun = Mock(name='fun')
  284. CeleryCommand.register_command(fun, name='foo')
  285. self.assertIs(CeleryCommand.commands['foo'], fun)
  286. finally:
  287. CeleryCommand.commands = prev
  288. def test_handle_argv(self):
  289. x = CeleryCommand(app=self.app)
  290. x.execute = Mock()
  291. x.handle_argv('celery', [])
  292. x.execute.assert_called_with('help', ['help'])
  293. x.handle_argv('celery', ['start', 'foo'])
  294. x.execute.assert_called_with('start', ['start', 'foo'])
  295. def test_execute(self):
  296. x = CeleryCommand(app=self.app)
  297. Help = x.commands['help'] = Mock()
  298. help = Help.return_value = Mock()
  299. x.execute('fooox', ['a'])
  300. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  301. help.reset()
  302. x.execute('help', ['help'])
  303. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  304. Dummy = x.commands['dummy'] = Mock()
  305. dummy = Dummy.return_value = Mock()
  306. exc = dummy.run_from_argv.side_effect = Error(
  307. 'foo', status='EX_FAILURE',
  308. )
  309. x.on_error = Mock(name='on_error')
  310. help.reset()
  311. x.execute('dummy', ['dummy'])
  312. x.on_error.assert_called_with(exc)
  313. dummy.run_from_argv.assert_called_with(
  314. x.prog_name, [], command='dummy',
  315. )
  316. help.run_from_argv.assert_called_with(
  317. x.prog_name, [], command='help',
  318. )
  319. exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
  320. x.on_usage_error = Mock()
  321. x.execute('dummy', ['dummy'])
  322. x.on_usage_error.assert_called_with(exc)
  323. def test_on_usage_error(self):
  324. x = CeleryCommand(app=self.app)
  325. x.error = Mock()
  326. x.on_usage_error(x.UsageError('foo'), command=None)
  327. x.error.assert_called()
  328. x.on_usage_error(x.UsageError('foo'), command='dummy')
  329. def test_prepare_prog_name(self):
  330. x = CeleryCommand(app=self.app)
  331. main = Mock(name='__main__')
  332. main.__file__ = '/opt/foo.py'
  333. with patch.dict(sys.modules, __main__=main):
  334. self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
  335. self.assertEqual(x.prepare_prog_name('celery'), 'celery')
  336. class test_RemoteControl(AppCase):
  337. def test_call_interface(self):
  338. with self.assertRaises(NotImplementedError):
  339. _RemoteControl(app=self.app).call()
  340. class test_inspect(AppCase):
  341. def test_usage(self):
  342. self.assertTrue(inspect(app=self.app).usage('foo'))
  343. def test_command_info(self):
  344. i = inspect(app=self.app)
  345. self.assertTrue(i.get_command_info(
  346. 'ping', help=True, color=i.colored.red,
  347. ))
  348. def test_list_commands_color(self):
  349. i = inspect(app=self.app)
  350. self.assertTrue(i.list_commands(
  351. help=True, color=i.colored.red,
  352. ))
  353. self.assertTrue(i.list_commands(
  354. help=False, color=None,
  355. ))
  356. def test_epilog(self):
  357. self.assertTrue(inspect(app=self.app).epilog)
  358. def test_do_call_method_sql_transport_type(self):
  359. self.app.connection = Mock()
  360. conn = self.app.connection.return_value = Mock(name='Connection')
  361. conn.transport.driver_type = 'sql'
  362. i = inspect(app=self.app)
  363. with self.assertRaises(i.Error):
  364. i.do_call_method(['ping'])
  365. def test_say_directions(self):
  366. i = inspect(self.app)
  367. i.out = Mock()
  368. i.quiet = True
  369. i.say_chat('<-', 'hello out')
  370. i.out.assert_not_called()
  371. i.say_chat('->', 'hello in')
  372. i.out.assert_called()
  373. i.quiet = False
  374. i.out.reset_mock()
  375. i.say_chat('<-', 'hello out', 'body')
  376. i.out.assert_called()
  377. @patch('celery.app.control.Control.inspect')
  378. def test_run(self, real):
  379. out = WhateverIO()
  380. i = inspect(app=self.app, stdout=out)
  381. with self.assertRaises(Error):
  382. i.run()
  383. with self.assertRaises(Error):
  384. i.run('help')
  385. with self.assertRaises(Error):
  386. i.run('xyzzybaz')
  387. i.run('ping')
  388. real.assert_called()
  389. i.run('ping', destination='foo,bar')
  390. self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
  391. self.assertEqual(real.call_args[1]['timeout'], 0.2)
  392. callback = real.call_args[1]['callback']
  393. callback({'foo': {'ok': 'pong'}})
  394. self.assertIn('OK', out.getvalue())
  395. with patch('celery.bin.celery.json.dumps') as dumps:
  396. i.run('ping', json=True)
  397. dumps.assert_called()
  398. instance = real.return_value = Mock()
  399. instance.ping.return_value = None
  400. with self.assertRaises(Error):
  401. i.run('ping')
  402. out.seek(0)
  403. out.truncate()
  404. i.quiet = True
  405. i.say_chat('<-', 'hello')
  406. self.assertFalse(out.getvalue())
  407. def test_objgraph(self):
  408. i = inspect(app=self.app)
  409. i.call = Mock(name='call')
  410. i.objgraph('Message', foo=1)
  411. i.call.assert_called_with('objgraph', 'Message', foo=1)
  412. def test_conf(self):
  413. i = inspect(app=self.app)
  414. i.call = Mock(name='call')
  415. i.conf(with_defaults=True, foo=1)
  416. i.call.assert_called_with('conf', True, foo=1)
  417. class test_control(AppCase):
  418. def control(self, patch_call, *args, **kwargs):
  419. kwargs.setdefault('app', Mock(name='app'))
  420. c = control(*args, **kwargs)
  421. if patch_call:
  422. c.call = Mock(name='control.call')
  423. return c
  424. def test_call(self):
  425. i = self.control(False)
  426. i.call('foo', 1, kw=2)
  427. i.app.control.foo.assert_called_with(1, kw=2, reply=True)
  428. def test_pool_grow(self):
  429. i = self.control(True)
  430. i.pool_grow('pool_grow', n=2)
  431. i.call.assert_called_with('pool_grow', 2)
  432. def test_pool_shrink(self):
  433. i = self.control(True)
  434. i.pool_shrink('pool_shrink', n=2)
  435. i.call.assert_called_with('pool_shrink', 2)
  436. def test_rate_limit(self):
  437. i = self.control(True)
  438. i.rate_limit('rate_limit', 'proj.add', '1/s')
  439. i.call.assert_called_with('rate_limit', 'proj.add', '1/s')
  440. def test_time_limit(self):
  441. i = self.control(True)
  442. i.time_limit('time_limit', 'proj.add', 10, 30)
  443. i.call.assert_called_with('time_limit', 'proj.add', 10, 30)
  444. def test_add_consumer(self):
  445. i = self.control(True)
  446. i.add_consumer(
  447. 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
  448. durable=True,
  449. )
  450. i.call.assert_called_with(
  451. 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
  452. durable=True,
  453. )
  454. def test_cancel_consumer(self):
  455. i = self.control(True)
  456. i.cancel_consumer('cancel_consumer', 'queue')
  457. i.call.assert_called_with('cancel_consumer', 'queue')
  458. class test_multi(AppCase):
  459. def test_get_options(self):
  460. self.assertIsNone(multi(app=self.app).get_options())
  461. def test_run_from_argv(self):
  462. with patch('celery.bin.multi.MultiTool') as MultiTool:
  463. m = MultiTool.return_value = Mock()
  464. multi(self.app).run_from_argv('celery', ['arg'], command='multi')
  465. m.execute_from_commandline.assert_called_with(
  466. ['multi', 'arg'], 'celery',
  467. )
  468. class test_main(AppCase):
  469. @patch('celery.bin.celery.CeleryCommand')
  470. def test_main(self, Command):
  471. cmd = Command.return_value = Mock()
  472. mainfun()
  473. cmd.execute_from_commandline.assert_called_with(None)
  474. @patch('celery.bin.celery.CeleryCommand')
  475. def test_main_KeyboardInterrupt(self, Command):
  476. cmd = Command.return_value = Mock()
  477. cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
  478. mainfun()
  479. cmd.execute_from_commandline.assert_called_with(None)
  480. class test_compat(AppCase):
  481. def test_compat_command_decorator(self):
  482. with patch('celery.bin.celery.CeleryCommand') as CC:
  483. self.assertEqual(command(), CC.register_command)
  484. fun = Mock(name='fun')
  485. command(fun)
  486. CC.register_command.assert_called_with(fun)