test_celery.py 9.9 KB


  1. from __future__ import absolute_import
  2. from anyjson import dumps
  3. from datetime import datetime
  4. from mock import Mock, patch
  5. from celery import task
  6. from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
  7. from celery.bin.celery import (
  8. Command,
  9. Error,
  10. worker,
  11. list_,
  12. call,
  13. purge,
  14. result,
  15. inspect,
  16. status,
  17. migrate,
  18. help,
  19. report,
  20. CeleryCommand,
  21. determine_exit_status,
  22. main,
  23. )
  24. from celery.tests.utils import AppCase, WhateverIO
  25. @task()
  26. def add(x, y):
  27. return x + y
  28. class test_Command(AppCase):
  29. def test_Error_repr(self):
  30. x = Error('something happened')
  31. self.assertIsNotNone(x.status)
  32. self.assertTrue(x.reason)
  33. self.assertTrue(str(x))
  34. def setup(self):
  35. self.out = WhateverIO()
  36. self.err = WhateverIO()
  37. self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
  38. def test_show_help(self):
  39. self.cmd.run_from_argv = Mock()
  40. self.assertEqual(self.cmd.show_help('foo'), EX_USAGE)
  41. self.cmd.run_from_argv.assert_called_with(
  42. self.cmd.prog_name, ['foo', '--help']
  43. )
  44. def test_error(self):
  45. self.cmd.out = Mock()
  46. self.cmd.error('FOO')
  47. self.assertTrue(self.cmd.out.called)
  48. def test_out(self):
  49. f = Mock()
  50. self.cmd.out('foo', f)
  51. def test_call(self):
  52. self.cmd.run = Mock()
  53. self.cmd.run.return_value = None
  54. self.assertEqual(self.cmd(), EX_OK)
  55. self.cmd.run.side_effect = Error('error', EX_FAILURE)
  56. self.assertEqual(self.cmd(), EX_FAILURE)
  57. def test_run_from_argv(self):
  58. with self.assertRaises(NotImplementedError):
  59. self.cmd.run_from_argv('prog', ['foo', 'bar'])
  60. self.assertEqual(self.cmd.prog_name, 'prog')
  61. def test_prettify_list(self):
  62. self.assertEqual(self.cmd.prettify([])[1], '- empty -')
  63. self.assertIn('bar', self.cmd.prettify(['foo', 'bar'])[1])
  64. def test_prettify_dict(self):
  65. self.assertIn('OK',
  66. str(self.cmd.prettify({'ok': 'the quick brown fox'})[0]))
  67. self.assertIn('ERROR',
  68. str(self.cmd.prettify({'error': 'the quick brown fox'})[0]))
  69. def test_prettify(self):
  70. self.assertIn('OK', str(self.cmd.prettify('the quick brown')))
  71. self.assertIn('OK', str(self.cmd.prettify(object())))
  72. self.assertIn('OK', str(self.cmd.prettify({'foo': 'bar'})))
  73. class test_Delegate(AppCase):
  74. def test_get_options(self):
  75. self.assertTrue(worker(app=self.app).get_options())
  76. def test_run(self):
  77. w = worker()
  78. w.target.run = Mock()
  79. w.run()
  80. w.target.run.assert_called_with()
  81. class test_list(AppCase):
  82. def test_list_bindings_no_support(self):
  83. l = list_(app=self.app, stderr=WhateverIO())
  84. management = Mock()
  85. management.get_bindings.side_effect = NotImplementedError()
  86. with self.assertRaises(Error):
  87. l.list_bindings(management)
  88. def test_run(self):
  89. l = list_(app=self.app, stderr=WhateverIO())
  90. l.run('bindings')
  91. with self.assertRaises(Error):
  92. l.run(None)
  93. with self.assertRaises(Error):
  94. l.run('foo')
  95. class test_call(AppCase):
  96. @patch('celery.app.base.Celery.send_task')
  97. def test_run(self, send_task):
  98. a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
  99. a.run('tasks.add')
  100. self.assertTrue(send_task.called)
  101. a.run('tasks.add',
  102. args=dumps([4, 4]),
  103. kwargs=dumps({'x': 2, 'y': 2}))
  104. self.assertEqual(send_task.call_args[1]['args'], [4, 4])
  105. self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})
  106. a.run('tasks.add', expires=10, countdown=10)
  107. self.assertEqual(send_task.call_args[1]['expires'], 10)
  108. self.assertEqual(send_task.call_args[1]['countdown'], 10)
  109. now = datetime.now()
  110. iso = now.isoformat()
  111. a.run('tasks.add', expires=iso)
  112. self.assertEqual(send_task.call_args[1]['expires'], now)
  113. with self.assertRaises(ValueError):
  114. a.run('tasks.add', expires='foobaribazibar')
  115. class test_purge(AppCase):
  116. @patch('celery.app.control.Control.purge')
  117. def test_run(self, purge_):
  118. out = WhateverIO()
  119. a = purge(app=self.app, stdout=out)
  120. purge_.return_value = 0
  121. a.run()
  122. self.assertIn('No messages purged', out.getvalue())
  123. purge_.return_value = 100
  124. a.run()
  125. self.assertIn('100 messages', out.getvalue())
  126. class test_result(AppCase):
  127. @patch('celery.result.AsyncResult.get')
  128. def test_run(self, get):
  129. out = WhateverIO()
  130. r = result(app=self.app, stdout=out)
  131. get.return_value = 'Jerry'
  132. r.run('id')
  133. self.assertIn('Jerry', out.getvalue())
  134. get.return_value = 'Elaine'
  135. r.run('id', task=add.name)
  136. self.assertIn('Elaine', out.getvalue())
  137. class test_status(AppCase):
  138. @patch('celery.bin.celery.inspect')
  139. def test_run(self, inspect_):
  140. out, err = WhateverIO(), WhateverIO()
  141. ins = inspect_.return_value = Mock()
  142. ins.run.return_value = []
  143. s = status(self.app, stdout=out, stderr=err)
  144. with self.assertRaises(Error):
  145. s.run()
  146. ins.run.return_value = ['a', 'b', 'c']
  147. s.run()
  148. self.assertIn('3 nodes online', out.getvalue())
  149. s.run(quiet=True)
  150. class test_migrate(AppCase):
  151. @patch('celery.contrib.migrate.migrate_tasks')
  152. def test_run(self, migrate_tasks):
  153. out = WhateverIO()
  154. m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
  155. with self.assertRaises(SystemExit):
  156. m.run()
  157. self.assertFalse(migrate_tasks.called)
  158. m.run('memory://foo', 'memory://bar')
  159. self.assertTrue(migrate_tasks.called)
  160. state = Mock()
  161. state.count = 10
  162. state.strtotal = 30
  163. m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
  164. self.assertIn('10/30', out.getvalue())
  165. class test_report(AppCase):
  166. def test_run(self):
  167. out = WhateverIO()
  168. r = report(app=self.app, stdout=out)
  169. self.assertEqual(r.run(), EX_OK)
  170. self.assertTrue(out.getvalue())
  171. class test_help(AppCase):
  172. def test_run(self):
  173. out = WhateverIO()
  174. h = help(app=self.app, stdout=out)
  175. h.parser = Mock()
  176. self.assertEqual(h.run(), EX_USAGE)
  177. self.assertTrue(out.getvalue())
  178. self.assertTrue(h.usage('help'))
  179. h.parser.print_help.assert_called_with()
  180. class test_CeleryCommand(AppCase):
  181. def test_execute_from_commandline(self):
  182. x = CeleryCommand(app=self.app)
  183. x.handle_argv = Mock()
  184. x.handle_argv.return_value = 1
  185. with self.assertRaises(SystemExit):
  186. x.execute_from_commandline()
  187. x.handle_argv.return_value = True
  188. with self.assertRaises(SystemExit):
  189. x.execute_from_commandline()
  190. x.handle_argv.side_effect = KeyboardInterrupt()
  191. with self.assertRaises(SystemExit):
  192. x.execute_from_commandline()
  193. def test_determine_exit_status(self):
  194. self.assertEqual(determine_exit_status('true'), EX_OK)
  195. self.assertEqual(determine_exit_status(''), EX_FAILURE)
  196. def test_remove_options_at_beginning(self):
  197. x = CeleryCommand(app=self.app)
  198. self.assertEqual(x.remove_options_at_beginning(None), [])
  199. self.assertEqual(x.remove_options_at_beginning(['-c 3', '--foo']), [])
  200. self.assertEqual(x.remove_options_at_beginning(['--foo', '-c 3']), [])
  201. self.assertEqual(x.remove_options_at_beginning(
  202. ['foo', '--foo=1']), ['foo', '--foo=1'])
  203. def test_handle_argv(self):
  204. x = CeleryCommand(app=self.app)
  205. x.execute = Mock()
  206. x.handle_argv('celery', [])
  207. x.execute.assert_called_with('help', ['help'])
  208. x.handle_argv('celery', ['start', 'foo'])
  209. x.execute.assert_called_with('start', ['start', 'foo'])
  210. def test_execute(self):
  211. x = CeleryCommand(app=self.app)
  212. Help = x.commands['help'] = Mock()
  213. help = Help.return_value = Mock()
  214. x.execute('fooox', ['a'])
  215. help.run_from_argv.assert_called_with(x.prog_name, ['help'])
  216. help.reset()
  217. x.execute('help', ['help'])
  218. help.run_from_argv.assert_called_with(x.prog_name, ['help'])
  219. Dummy = x.commands['dummy'] = Mock()
  220. dummy = Dummy.return_value = Mock()
  221. dummy.run_from_argv.side_effect = Error('foo', status='EX_FAILURE')
  222. help.reset()
  223. x.execute('dummy', ['dummy'])
  224. dummy.run_from_argv.assert_called_with(x.prog_name, ['dummy'])
  225. help.run_from_argv.assert_called_with(x.prog_name, ['dummy'])
  226. class test_inspect(AppCase):
  227. def test_usage(self):
  228. self.assertTrue(inspect(app=self.app).usage('foo'))
  229. @patch('celery.app.control.Control.inspect')
  230. def test_run(self, real):
  231. out = WhateverIO()
  232. i = inspect(app=self.app, stdout=out)
  233. with self.assertRaises(Error):
  234. i.run()
  235. with self.assertRaises(Error):
  236. i.run('help')
  237. with self.assertRaises(Error):
  238. i.run('xyzzybaz')
  239. i.run('ping')
  240. self.assertTrue(real.called)
  241. i.run('ping', destination='foo,bar')
  242. self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
  243. self.assertEqual(real.call_args[1]['timeout'], 0.2)
  244. callback = real.call_args[1]['callback']
  245. callback({'foo': {'ok': 'pong'}})
  246. self.assertIn('OK', out.getvalue())
  247. instance = real.return_value = Mock()
  248. instance.ping.return_value = None
  249. with self.assertRaises(Error):
  250. i.run('ping')
  251. out.seek(0)
  252. out.truncate()
  253. i.quiet = True
  254. i.say('<-', 'hello')
  255. self.assertFalse(out.getvalue())
  256. class test_main(AppCase):
  257. @patch('celery.bin.celery.CeleryCommand')
  258. def test_main(self, Command):
  259. command = Command.return_value = Mock()
  260. main()
  261. command.execute_from_commandline.assert_called_with(None)