test_celery.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. from __future__ import absolute_import
  2. import sys
  3. from anyjson import dumps
  4. from datetime import datetime
  5. from celery import __main__
  6. from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
  7. from celery.bin.base import Error
  8. from celery.bin.celery import (
  9. Command,
  10. list_,
  11. call,
  12. purge,
  13. result,
  14. inspect,
  15. control,
  16. status,
  17. migrate,
  18. help,
  19. report,
  20. CeleryCommand,
  21. determine_exit_status,
  22. multi,
  23. main as mainfun,
  24. _RemoteControl,
  25. command,
  26. )
  27. from celery.tests.case import (
  28. AppCase, Mock, WhateverIO, override_stdouts, patch,
  29. )
  30. class test__main__(AppCase):
  31. def test_warn_deprecated(self):
  32. with override_stdouts() as (stdout, _):
  33. __main__._warn_deprecated('YADDA YADDA')
  34. self.assertIn('command is deprecated', stdout.getvalue())
  35. self.assertIn('YADDA YADDA', stdout.getvalue())
  36. def test_main(self):
  37. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  38. with patch('celery.bin.celery.main') as main:
  39. __main__.main()
  40. mpc.assert_called_with()
  41. main.assert_called_with()
  42. def test_compat_worker(self):
  43. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  44. with patch('celery.__main__._warn_deprecated') as depr:
  45. with patch('celery.bin.worker.main') as main:
  46. __main__._compat_worker()
  47. mpc.assert_called_with()
  48. depr.assert_called_with('celery worker')
  49. main.assert_called_with()
  50. def test_compat_multi(self):
  51. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  52. with patch('celery.__main__._warn_deprecated') as depr:
  53. with patch('celery.bin.multi.main') as main:
  54. __main__._compat_multi()
  55. self.assertFalse(mpc.called)
  56. depr.assert_called_with('celery multi')
  57. main.assert_called_with()
  58. def test_compat_beat(self):
  59. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  60. with patch('celery.__main__._warn_deprecated') as depr:
  61. with patch('celery.bin.beat.main') as main:
  62. __main__._compat_beat()
  63. mpc.assert_called_with()
  64. depr.assert_called_with('celery beat')
  65. main.assert_called_with()
  66. class test_Command(AppCase):
  67. def test_Error_repr(self):
  68. x = Error('something happened')
  69. self.assertIsNotNone(x.status)
  70. self.assertTrue(x.reason)
  71. self.assertTrue(str(x))
  72. def setup(self):
  73. self.out = WhateverIO()
  74. self.err = WhateverIO()
  75. self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
  76. def test_error(self):
  77. self.cmd.out = Mock()
  78. self.cmd.error('FOO')
  79. self.assertTrue(self.cmd.out.called)
  80. def test_out(self):
  81. f = Mock()
  82. self.cmd.out('foo', f)
  83. def test_call(self):
  84. def ok_run():
  85. pass
  86. self.cmd.run = ok_run
  87. self.assertEqual(self.cmd(), EX_OK)
  88. def error_run():
  89. raise Error('error', EX_FAILURE)
  90. self.cmd.run = error_run
  91. self.assertEqual(self.cmd(), EX_FAILURE)
  92. def test_run_from_argv(self):
  93. with self.assertRaises(NotImplementedError):
  94. self.cmd.run_from_argv('prog', ['foo', 'bar'])
  95. def test_pretty_list(self):
  96. self.assertEqual(self.cmd.pretty([])[1], '- empty -')
  97. self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])
  98. def test_pretty_dict(self):
  99. self.assertIn(
  100. 'OK',
  101. str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]),
  102. )
  103. self.assertIn(
  104. 'ERROR',
  105. str(self.cmd.pretty({'error': 'the quick brown fox'})[0]),
  106. )
  107. def test_pretty(self):
  108. self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
  109. self.assertIn('OK', str(self.cmd.pretty(object())))
  110. self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))
  111. class test_list(AppCase):
  112. def test_list_bindings_no_support(self):
  113. l = list_(app=self.app, stderr=WhateverIO())
  114. management = Mock()
  115. management.get_bindings.side_effect = NotImplementedError()
  116. with self.assertRaises(Error):
  117. l.list_bindings(management)
  118. def test_run(self):
  119. l = list_(app=self.app, stderr=WhateverIO())
  120. l.run('bindings')
  121. with self.assertRaises(Error):
  122. l.run(None)
  123. with self.assertRaises(Error):
  124. l.run('foo')
  125. class test_call(AppCase):
  126. def setup(self):
  127. @self.app.task(shared=False)
  128. def add(x, y):
  129. return x + y
  130. self.add = add
  131. @patch('celery.app.base.Celery.send_task')
  132. def test_run(self, send_task):
  133. a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
  134. a.run(self.add.name)
  135. self.assertTrue(send_task.called)
  136. a.run(self.add.name,
  137. args=dumps([4, 4]),
  138. kwargs=dumps({'x': 2, 'y': 2}))
  139. self.assertEqual(send_task.call_args[1]['args'], [4, 4])
  140. self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})
  141. a.run(self.add.name, expires=10, countdown=10)
  142. self.assertEqual(send_task.call_args[1]['expires'], 10)
  143. self.assertEqual(send_task.call_args[1]['countdown'], 10)
  144. now = datetime.now()
  145. iso = now.isoformat()
  146. a.run(self.add.name, expires=iso)
  147. self.assertEqual(send_task.call_args[1]['expires'], now)
  148. with self.assertRaises(ValueError):
  149. a.run(self.add.name, expires='foobaribazibar')
  150. class test_purge(AppCase):
  151. @patch('celery.app.control.Control.purge')
  152. def test_run(self, purge_):
  153. out = WhateverIO()
  154. a = purge(app=self.app, stdout=out)
  155. purge_.return_value = 0
  156. a.run()
  157. self.assertIn('No messages purged', out.getvalue())
  158. purge_.return_value = 100
  159. a.run()
  160. self.assertIn('100 messages', out.getvalue())
  161. class test_result(AppCase):
  162. def setup(self):
  163. @self.app.task(shared=False)
  164. def add(x, y):
  165. return x + y
  166. self.add = add
  167. def test_run(self):
  168. with patch('celery.result.AsyncResult.get') as get:
  169. out = WhateverIO()
  170. r = result(app=self.app, stdout=out)
  171. get.return_value = 'Jerry'
  172. r.run('id')
  173. self.assertIn('Jerry', out.getvalue())
  174. get.return_value = 'Elaine'
  175. r.run('id', task=self.add.name)
  176. self.assertIn('Elaine', out.getvalue())
  177. with patch('celery.result.AsyncResult.traceback') as tb:
  178. r.run('id', task=self.add.name, traceback=True)
  179. self.assertIn(str(tb), out.getvalue())
  180. class test_status(AppCase):
  181. @patch('celery.bin.celery.inspect')
  182. def test_run(self, inspect_):
  183. out, err = WhateverIO(), WhateverIO()
  184. ins = inspect_.return_value = Mock()
  185. ins.run.return_value = []
  186. s = status(self.app, stdout=out, stderr=err)
  187. with self.assertRaises(Error):
  188. s.run()
  189. ins.run.return_value = ['a', 'b', 'c']
  190. s.run()
  191. self.assertIn('3 nodes online', out.getvalue())
  192. s.run(quiet=True)
  193. class test_migrate(AppCase):
  194. @patch('celery.contrib.migrate.migrate_tasks')
  195. def test_run(self, migrate_tasks):
  196. out = WhateverIO()
  197. m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
  198. with self.assertRaises(TypeError):
  199. m.run()
  200. self.assertFalse(migrate_tasks.called)
  201. m.run('memory://foo', 'memory://bar')
  202. self.assertTrue(migrate_tasks.called)
  203. state = Mock()
  204. state.count = 10
  205. state.strtotal = 30
  206. m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
  207. self.assertIn('10/30', out.getvalue())
  208. class test_report(AppCase):
  209. def test_run(self):
  210. out = WhateverIO()
  211. r = report(app=self.app, stdout=out)
  212. self.assertEqual(r.run(), EX_OK)
  213. self.assertTrue(out.getvalue())
  214. class test_help(AppCase):
  215. def test_run(self):
  216. out = WhateverIO()
  217. h = help(app=self.app, stdout=out)
  218. h.parser = Mock()
  219. self.assertEqual(h.run(), EX_USAGE)
  220. self.assertTrue(out.getvalue())
  221. self.assertTrue(h.usage('help'))
  222. h.parser.print_help.assert_called_with()
  223. class test_CeleryCommand(AppCase):
  224. def test_execute_from_commandline(self):
  225. x = CeleryCommand(app=self.app)
  226. x.handle_argv = Mock()
  227. x.handle_argv.return_value = 1
  228. with self.assertRaises(SystemExit):
  229. x.execute_from_commandline()
  230. x.handle_argv.return_value = True
  231. with self.assertRaises(SystemExit):
  232. x.execute_from_commandline()
  233. x.handle_argv.side_effect = KeyboardInterrupt()
  234. with self.assertRaises(SystemExit):
  235. x.execute_from_commandline()
  236. x.respects_app_option = True
  237. with self.assertRaises(SystemExit):
  238. x.execute_from_commandline(['celery', 'multi'])
  239. self.assertFalse(x.respects_app_option)
  240. x.respects_app_option = True
  241. with self.assertRaises(SystemExit):
  242. x.execute_from_commandline(['manage.py', 'celery', 'multi'])
  243. self.assertFalse(x.respects_app_option)
  244. def test_with_pool_option(self):
  245. x = CeleryCommand(app=self.app)
  246. self.assertIsNone(x.with_pool_option(['celery', 'events']))
  247. self.assertTrue(x.with_pool_option(['celery', 'worker']))
  248. self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))
  249. def test_load_extensions_no_commands(self):
  250. with patch('celery.bin.celery.Extensions') as Ext:
  251. ext = Ext.return_value = Mock(name='Extension')
  252. ext.load.return_value = None
  253. x = CeleryCommand(app=self.app)
  254. x.load_extension_commands()
  255. def test_determine_exit_status(self):
  256. self.assertEqual(determine_exit_status('true'), EX_OK)
  257. self.assertEqual(determine_exit_status(''), EX_FAILURE)
  258. def test_relocate_args_from_start(self):
  259. x = CeleryCommand(app=self.app)
  260. self.assertEqual(x._relocate_args_from_start(None), [])
  261. self.assertEqual(
  262. x._relocate_args_from_start(
  263. ['-l', 'debug', 'worker', '-c', '3', '--foo'],
  264. ),
  265. ['worker', '-c', '3', '--foo', '-l', 'debug'],
  266. )
  267. self.assertEqual(
  268. x._relocate_args_from_start(
  269. ['--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3'],
  270. ),
  271. ['worker', '--foo', '-c', '3', '--pool=gevent', '-l', 'debug'],
  272. )
  273. self.assertEqual(
  274. x._relocate_args_from_start(['foo', '--foo=1']),
  275. ['foo', '--foo=1'],
  276. )
  277. def test_handle_argv(self):
  278. x = CeleryCommand(app=self.app)
  279. x.execute = Mock()
  280. x.handle_argv('celery', [])
  281. x.execute.assert_called_with('help', ['help'])
  282. x.handle_argv('celery', ['start', 'foo'])
  283. x.execute.assert_called_with('start', ['start', 'foo'])
  284. def test_execute(self):
  285. x = CeleryCommand(app=self.app)
  286. Help = x.commands['help'] = Mock()
  287. help = Help.return_value = Mock()
  288. x.execute('fooox', ['a'])
  289. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  290. help.reset()
  291. x.execute('help', ['help'])
  292. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  293. Dummy = x.commands['dummy'] = Mock()
  294. dummy = Dummy.return_value = Mock()
  295. dummy.run_from_argv.side_effect = Error('foo', status='EX_FAILURE')
  296. help.reset()
  297. x.execute('dummy', ['dummy'])
  298. dummy.run_from_argv.assert_called_with(
  299. x.prog_name, [], command='dummy',
  300. )
  301. help.run_from_argv.assert_called_with(
  302. x.prog_name, [], command='help',
  303. )
  304. exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
  305. x.on_usage_error = Mock()
  306. x.execute('dummy', ['dummy'])
  307. x.on_usage_error.assert_called_with(exc)
  308. def test_on_usage_error(self):
  309. x = CeleryCommand(app=self.app)
  310. x.error = Mock()
  311. x.on_usage_error(x.UsageError('foo'), command=None)
  312. self.assertTrue(x.error.called)
  313. x.on_usage_error(x.UsageError('foo'), command='dummy')
  314. def test_prepare_prog_name(self):
  315. x = CeleryCommand(app=self.app)
  316. main = Mock(name='__main__')
  317. main.__file__ = '/opt/foo.py'
  318. with patch.dict(sys.modules, __main__=main):
  319. self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
  320. self.assertEqual(x.prepare_prog_name('celery'), 'celery')
  321. class test_RemoteControl(AppCase):
  322. def test_call_interface(self):
  323. with self.assertRaises(NotImplementedError):
  324. _RemoteControl(app=self.app).call()
  325. class test_inspect(AppCase):
  326. def test_usage(self):
  327. self.assertTrue(inspect(app=self.app).usage('foo'))
  328. def test_command_info(self):
  329. i = inspect(app=self.app)
  330. self.assertTrue(i.get_command_info(
  331. 'ping', help=True, color=i.colored.red,
  332. ))
  333. def test_list_commands_color(self):
  334. i = inspect(app=self.app)
  335. self.assertTrue(i.list_commands(
  336. help=True, color=i.colored.red,
  337. ))
  338. self.assertTrue(i.list_commands(
  339. help=False, color=None,
  340. ))
  341. def test_epilog(self):
  342. self.assertTrue(inspect(app=self.app).epilog)
  343. def test_do_call_method_sql_transport_type(self):
  344. self.app.connection = Mock()
  345. conn = self.app.connection.return_value = Mock(name='Connection')
  346. conn.transport.driver_type = 'sql'
  347. i = inspect(app=self.app)
  348. with self.assertRaises(i.Error):
  349. i.do_call_method(['ping'])
  350. def test_say_directions(self):
  351. i = inspect(self.app)
  352. i.out = Mock()
  353. i.quiet = True
  354. i.say_chat('<-', 'hello out')
  355. self.assertFalse(i.out.called)
  356. i.say_chat('->', 'hello in')
  357. self.assertTrue(i.out.called)
  358. i.quiet = False
  359. i.out.reset_mock()
  360. i.say_chat('<-', 'hello out', 'body')
  361. self.assertTrue(i.out.called)
  362. @patch('celery.app.control.Control.inspect')
  363. def test_run(self, real):
  364. out = WhateverIO()
  365. i = inspect(app=self.app, stdout=out)
  366. with self.assertRaises(Error):
  367. i.run()
  368. with self.assertRaises(Error):
  369. i.run('help')
  370. with self.assertRaises(Error):
  371. i.run('xyzzybaz')
  372. i.run('ping')
  373. self.assertTrue(real.called)
  374. i.run('ping', destination='foo,bar')
  375. self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
  376. self.assertEqual(real.call_args[1]['timeout'], 0.2)
  377. callback = real.call_args[1]['callback']
  378. callback({'foo': {'ok': 'pong'}})
  379. self.assertIn('OK', out.getvalue())
  380. instance = real.return_value = Mock()
  381. instance.ping.return_value = None
  382. with self.assertRaises(Error):
  383. i.run('ping')
  384. out.seek(0)
  385. out.truncate()
  386. i.quiet = True
  387. i.say_chat('<-', 'hello')
  388. self.assertFalse(out.getvalue())
  389. class test_control(AppCase):
  390. def control(self, patch_call, *args, **kwargs):
  391. kwargs.setdefault('app', Mock(name='app'))
  392. c = control(*args, **kwargs)
  393. if patch_call:
  394. c.call = Mock(name='control.call')
  395. return c
  396. def test_call(self):
  397. i = self.control(False)
  398. i.call('foo', 1, kw=2)
  399. i.app.control.foo.assert_called_with(1, kw=2, reply=True)
  400. def test_pool_grow(self):
  401. i = self.control(True)
  402. i.pool_grow('pool_grow', n=2)
  403. i.call.assert_called_with('pool_grow', 2)
  404. def test_pool_shrink(self):
  405. i = self.control(True)
  406. i.pool_shrink('pool_shrink', n=2)
  407. i.call.assert_called_with('pool_shrink', 2)
  408. def test_autoscale(self):
  409. i = self.control(True)
  410. i.autoscale('autoscale', max=3, min=2)
  411. i.call.assert_called_with('autoscale', 3, 2)
  412. def test_rate_limit(self):
  413. i = self.control(True)
  414. i.rate_limit('rate_limit', 'proj.add', '1/s')
  415. i.call.assert_called_with('rate_limit', 'proj.add', '1/s')
  416. def test_time_limit(self):
  417. i = self.control(True)
  418. i.time_limit('time_limit', 'proj.add', 10, 30)
  419. i.call.assert_called_with('time_limit', 'proj.add', 10, 30)
  420. def test_add_consumer(self):
  421. i = self.control(True)
  422. i.add_consumer(
  423. 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
  424. durable=True,
  425. )
  426. i.call.assert_called_with(
  427. 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
  428. durable=True,
  429. )
  430. def test_cancel_consumer(self):
  431. i = self.control(True)
  432. i.cancel_consumer('cancel_consumer', 'queue')
  433. i.call.assert_called_with('cancel_consumer', 'queue')
  434. class test_multi(AppCase):
  435. def test_get_options(self):
  436. self.assertTupleEqual(multi(app=self.app).get_options(), ())
  437. def test_run_from_argv(self):
  438. with patch('celery.bin.multi.MultiTool') as MultiTool:
  439. m = MultiTool.return_value = Mock()
  440. multi(self.app).run_from_argv('celery', ['arg'], command='multi')
  441. m.execute_from_commandline.assert_called_with(
  442. ['multi', 'arg'], 'celery',
  443. )
  444. class test_main(AppCase):
  445. @patch('celery.bin.celery.CeleryCommand')
  446. def test_main(self, Command):
  447. cmd = Command.return_value = Mock()
  448. mainfun()
  449. cmd.execute_from_commandline.assert_called_with(None)
  450. @patch('celery.bin.celery.CeleryCommand')
  451. def test_main_KeyboardInterrupt(self, Command):
  452. cmd = Command.return_value = Mock()
  453. cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
  454. mainfun()
  455. cmd.execute_from_commandline.assert_called_with(None)
  456. class test_compat(AppCase):
  457. def test_compat_command_decorator(self):
  458. with patch('celery.bin.celery.CeleryCommand') as CC:
  459. self.assertEqual(command(), CC.register_command)
  460. fun = Mock(name='fun')
  461. command(fun)
  462. CC.register_command.assert_called_with(fun)