test_celery.py 10 KB


  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from anyjson import dumps
  4. from datetime import datetime
  5. from mock import Mock, patch
  6. from celery import task
  7. from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
  8. from celery.bin.celery import (
  9. Command,
  10. Error,
  11. worker,
  12. list_,
  13. apply,
  14. purge,
  15. result,
  16. inspect,
  17. status,
  18. migrate,
  19. help,
  20. report,
  21. CeleryCommand,
  22. determine_exit_status,
  23. main,
  24. )
  25. from celery.tests.utils import AppCase, WhateverIO
  26. @task
  27. def add(x, y):
  28. return x + y
  29. class test_Command(AppCase):
  30. def test_Error_repr(self):
  31. x = Error("something happened")
  32. self.assertIsNotNone(x.status)
  33. self.assertTrue(x.reason)
  34. self.assertTrue(str(x))
  35. def setup(self):
  36. self.out = WhateverIO()
  37. self.err = WhateverIO()
  38. self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
  39. def test_show_help(self):
  40. self.cmd.run_from_argv = Mock()
  41. self.assertEqual(self.cmd.show_help("foo"), EX_USAGE)
  42. self.cmd.run_from_argv.assert_called_with(
  43. self.cmd.prog_name, ["foo", "--help"]
  44. )
  45. def test_error(self):
  46. self.cmd.out = Mock()
  47. self.cmd.error("FOO")
  48. self.assertTrue(self.cmd.out.called)
  49. def test_out(self):
  50. f = Mock()
  51. self.cmd.out("foo", f)
  52. f.write.assert_called_with("foo\n")
  53. self.cmd.out("foo\n", f)
  54. def test_call(self):
  55. self.cmd.run = Mock()
  56. self.cmd.run.return_value = None
  57. self.assertEqual(self.cmd(), EX_OK)
  58. self.cmd.run.side_effect = Error("error", EX_FAILURE)
  59. self.assertEqual(self.cmd(), EX_FAILURE)
  60. def test_run_from_argv(self):
  61. with self.assertRaises(NotImplementedError):
  62. self.cmd.run_from_argv("prog", ["foo", "bar"])
  63. self.assertEqual(self.cmd.prog_name, "prog")
  64. def test_prettify_list(self):
  65. self.assertEqual(self.cmd.prettify([])[1], "- empty -")
  66. self.assertIn("bar", self.cmd.prettify(["foo", "bar"])[1])
  67. def test_prettify_dict(self):
  68. self.assertIn("OK",
  69. str(self.cmd.prettify({"ok": "the quick brown fox"})[0]))
  70. self.assertIn("ERROR",
  71. str(self.cmd.prettify({"error": "the quick brown fox"})[0]))
  72. def test_prettify(self):
  73. self.assertIn("OK", str(self.cmd.prettify("the quick brown")))
  74. self.assertIn("OK", str(self.cmd.prettify(object())))
  75. self.assertIn("OK", str(self.cmd.prettify({"foo": "bar"})))
  76. class test_Delegate(AppCase):
  77. def test_get_options(self):
  78. self.assertTrue(worker().get_options())
  79. def test_run(self):
  80. w = worker()
  81. w.target.run = Mock()
  82. w.run()
  83. w.target.run.assert_called_with()
  84. class test_list(AppCase):
  85. def test_list_bindings_no_support(self):
  86. l = list_(app=self.app, stderr=WhateverIO())
  87. management = Mock()
  88. management.get_bindings.side_effect = NotImplementedError()
  89. with self.assertRaises(Error):
  90. l.list_bindings(management)
  91. def test_run(self):
  92. l = list_(app=self.app, stderr=WhateverIO())
  93. l.run("bindings")
  94. with self.assertRaises(Error):
  95. l.run(None)
  96. with self.assertRaises(Error):
  97. l.run("foo")
  98. class test_apply(AppCase):
  99. @patch("celery.app.base.Celery.send_task")
  100. def test_run(self, send_task):
  101. a = apply(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
  102. a.run("tasks.add")
  103. self.assertTrue(send_task.called)
  104. a.run("tasks.add",
  105. args=dumps([4, 4]),
  106. kwargs=dumps({"x": 2, "y": 2}))
  107. self.assertEqual(send_task.call_args[1]["args"], [4, 4])
  108. self.assertEqual(send_task.call_args[1]["kwargs"], {"x": 2, "y": 2})
  109. a.run("tasks.add", expires=10, countdown=10)
  110. self.assertEqual(send_task.call_args[1]["expires"], 10)
  111. self.assertEqual(send_task.call_args[1]["countdown"], 10)
  112. now = datetime.now()
  113. iso = now.isoformat()
  114. a.run("tasks.add", expires=iso)
  115. self.assertEqual(send_task.call_args[1]["expires"], now)
  116. with self.assertRaises(ValueError):
  117. a.run("tasks.add", expires="foobaribazibar")
  118. class test_purge(AppCase):
  119. @patch("celery.app.control.Control.discard_all")
  120. def test_run(self, discard_all):
  121. out = WhateverIO()
  122. a = purge(app=self.app, stdout=out)
  123. discard_all.return_value = 0
  124. a.run()
  125. self.assertIn("No messages purged", out.getvalue())
  126. discard_all.return_value = 100
  127. a.run()
  128. self.assertIn("100 messages", out.getvalue())
  129. class test_result(AppCase):
  130. @patch("celery.result.AsyncResult.get")
  131. def test_run(self, get):
  132. out = WhateverIO()
  133. r = result(app=self.app, stdout=out)
  134. get.return_value = "Jerry"
  135. r.run("id")
  136. self.assertIn("Jerry", out.getvalue())
  137. get.return_value = "Elaine"
  138. r.run("id", task=add.name)
  139. self.assertIn("Elaine", out.getvalue())
  140. class test_status(AppCase):
  141. @patch("celery.bin.celery.inspect")
  142. def test_run(self, inspect_):
  143. out, err = WhateverIO(), WhateverIO()
  144. ins = inspect_.return_value = Mock()
  145. ins.run.return_value = []
  146. s = status(self.app, stdout=out, stderr=err)
  147. with self.assertRaises(Error):
  148. s.run()
  149. ins.run.return_value = ["a", "b", "c"]
  150. s.run()
  151. self.assertIn("3 nodes online", out.getvalue())
  152. s.run(quiet=True)
  153. class test_migrate(AppCase):
  154. @patch("celery.contrib.migrate.migrate_tasks")
  155. def test_run(self, migrate_tasks):
  156. out = WhateverIO()
  157. m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
  158. with self.assertRaises(SystemExit):
  159. m.run()
  160. self.assertFalse(migrate_tasks.called)
  161. m.run("memory://foo", "memory://bar")
  162. self.assertTrue(migrate_tasks.called)
  163. state = Mock()
  164. state.count = 10
  165. state.strtotal = 30
  166. m.on_migrate_task(state, {"task": "tasks.add", "id": "ID"}, None)
  167. self.assertIn("10/30", out.getvalue())
  168. class test_report(AppCase):
  169. def test_run(self):
  170. out = WhateverIO()
  171. r = report(app=self.app, stdout=out)
  172. self.assertEqual(r.run(), EX_OK)
  173. self.assertTrue(out.getvalue())
  174. class test_help(AppCase):
  175. def test_run(self):
  176. out = WhateverIO()
  177. h = help(app=self.app, stdout=out)
  178. h.parser = Mock()
  179. self.assertEqual(h.run(), EX_USAGE)
  180. self.assertTrue(out.getvalue())
  181. self.assertTrue(h.usage("help"))
  182. h.parser.print_help.assert_called_with()
  183. class test_CeleryCommand(AppCase):
  184. def test_execute_from_commandline(self):
  185. x = CeleryCommand(app=self.app)
  186. x.handle_argv = Mock()
  187. x.handle_argv.return_value = 1
  188. with self.assertRaises(SystemExit):
  189. x.execute_from_commandline()
  190. x.handle_argv.return_value = True
  191. with self.assertRaises(SystemExit):
  192. x.execute_from_commandline()
  193. x.handle_argv.side_effect = KeyboardInterrupt()
  194. with self.assertRaises(SystemExit):
  195. x.execute_from_commandline()
  196. def test_determine_exit_status(self):
  197. self.assertEqual(determine_exit_status("true"), EX_OK)
  198. self.assertEqual(determine_exit_status(""), EX_FAILURE)
  199. def test_remove_options_at_beginning(self):
  200. x = CeleryCommand(app=self.app)
  201. self.assertEqual(x.remove_options_at_beginning(None), [])
  202. self.assertEqual(x.remove_options_at_beginning(["-c 3", "--foo"]), [])
  203. self.assertEqual(x.remove_options_at_beginning(["--foo", "-c 3"]), [])
  204. self.assertEqual(x.remove_options_at_beginning(
  205. ["foo", "--foo=1"]), ["foo", "--foo=1"])
  206. def test_handle_argv(self):
  207. x = CeleryCommand(app=self.app)
  208. x.execute = Mock()
  209. x.handle_argv("celery", [])
  210. x.execute.assert_called_with("help", ["help"])
  211. x.handle_argv("celery", ["start", "foo"])
  212. x.execute.assert_called_with("start", ["start", "foo"])
  213. def test_execute(self):
  214. x = CeleryCommand(app=self.app)
  215. Help = x.commands["help"] = Mock()
  216. help = Help.return_value = Mock()
  217. x.execute("fooox", ["a"])
  218. help.run_from_argv.assert_called_with(x.prog_name, ["help"])
  219. help.reset()
  220. x.execute("help", ["help"])
  221. help.run_from_argv.assert_called_with(x.prog_name, ["help"])
  222. Dummy = x.commands["dummy"] = Mock()
  223. dummy = Dummy.return_value = Mock()
  224. dummy.run_from_argv.side_effect = Error("foo", status="EX_FAILURE")
  225. help.reset()
  226. x.execute("dummy", ["dummy"])
  227. dummy.run_from_argv.assert_called_with(x.prog_name, ["dummy"])
  228. help.run_from_argv.assert_called_with(x.prog_name, ["dummy"])
  229. class test_inspect(AppCase):
  230. def test_usage(self):
  231. self.assertTrue(inspect(app=self.app).usage("foo"))
  232. @patch("celery.app.control.Control.inspect")
  233. def test_run(self, real):
  234. out = WhateverIO()
  235. i = inspect(app=self.app, stdout=out)
  236. with self.assertRaises(Error):
  237. i.run()
  238. with self.assertRaises(Error):
  239. i.run("help")
  240. with self.assertRaises(Error):
  241. i.run("xyzzybaz")
  242. i.run("ping")
  243. self.assertTrue(real.called)
  244. i.run("ping", destination="foo,bar")
  245. self.assertEqual(real.call_args[1]["destination"], ["foo", "bar"])
  246. self.assertEqual(real.call_args[1]["timeout"], 0.2)
  247. callback = real.call_args[1]["callback"]
  248. callback({"foo": {"ok": "pong"}})
  249. self.assertIn("OK", out.getvalue())
  250. instance = real.return_value = Mock()
  251. instance.ping.return_value = None
  252. with self.assertRaises(Error):
  253. i.run("ping")
  254. out.seek(0)
  255. out.truncate()
  256. i.quiet = True
  257. i.say('<-', "hello")
  258. self.assertFalse(out.getvalue())
  259. class test_main(AppCase):
  260. @patch("celery.bin.celery.CeleryCommand")
  261. def test_main(self, Command):
  262. command = Command.return_value = Mock()
  263. main()
  264. command.execute_from_commandline.assert_called_with()