test_celeryd.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. import logging
  2. import os
  3. import sys
  4. import unittest2 as unittest
  5. from multiprocessing import get_logger, current_process
  6. from StringIO import StringIO
  7. from celery import platform
  8. from celery import signals
  9. from celery.app import default_app
  10. from celery.apps import worker as cd
  11. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  12. from celery.exceptions import ImproperlyConfigured
  13. from celery.utils import patch
  14. from celery.utils.functional import wraps
  15. from celery.tests.compat import catch_warnings
  16. from celery.tests.utils import execute_context
  17. patch.ensure_process_aware_logger()
  18. def disable_stdouts(fun):
  19. @wraps(fun)
  20. def disable(*args, **kwargs):
  21. sys.stdout, sys.stderr = StringIO(), StringIO()
  22. try:
  23. return fun(*args, **kwargs)
  24. finally:
  25. sys.stdout = sys.__stdout__
  26. sys.stderr = sys.__stderr__
  27. return disable
  28. class _WorkController(object):
  29. def __init__(self, *args, **kwargs):
  30. pass
  31. def start(self):
  32. pass
  33. class Worker(cd.Worker):
  34. WorkController = _WorkController
  35. class test_Worker(unittest.TestCase):
  36. Worker = Worker
  37. @disable_stdouts
  38. def test_queues_string(self):
  39. worker = self.Worker(queues="foo,bar,baz")
  40. worker.init_queues()
  41. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  42. @disable_stdouts
  43. def test_loglevel_string(self):
  44. worker = self.Worker(loglevel="INFO")
  45. self.assertEqual(worker.loglevel, logging.INFO)
  46. def test_run_worker(self):
  47. handlers = {}
  48. def i(sig, handler):
  49. handlers[sig] = handler
  50. p = platform.install_signal_handler
  51. platform.install_signal_handler = i
  52. try:
  53. w = self.Worker()
  54. w._isatty = False
  55. w.run_worker()
  56. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  57. self.assertIn(sig, handlers)
  58. handlers.clear()
  59. w = self.Worker()
  60. w._isatty = True
  61. w.run_worker()
  62. for sig in "SIGINT", "SIGTERM":
  63. self.assertIn(sig, handlers)
  64. self.assertNotIn("SIGHUP", handlers)
  65. finally:
  66. platform.install_signal_handler = p
  67. @disable_stdouts
  68. def test_startup_info(self):
  69. worker = self.Worker()
  70. worker.run()
  71. self.assertTrue(worker.startup_info())
  72. worker.loglevel = logging.DEBUG
  73. self.assertTrue(worker.startup_info())
  74. worker.loglevel = logging.INFO
  75. self.assertTrue(worker.startup_info())
  76. @disable_stdouts
  77. def test_run(self):
  78. self.Worker().run()
  79. self.Worker(discard=True).run()
  80. worker = self.Worker()
  81. worker.init_loader()
  82. worker.settings.DEBUG = True
  83. def with_catch_warnings(log):
  84. worker.run()
  85. self.assertIn("memory leak", log[0].message.args[0])
  86. context = catch_warnings(record=True)
  87. execute_context(context, with_catch_warnings)
  88. worker.settings.DEBUG = False
  89. @disable_stdouts
  90. def test_purge_messages(self):
  91. self.Worker().purge_messages()
  92. @disable_stdouts
  93. def test_init_queues(self):
  94. c = default_app.conf
  95. p, c.CELERY_QUEUES = c.CELERY_QUEUES, {
  96. "celery": {"exchange": "celery",
  97. "binding_key": "celery"},
  98. "video": {"exchange": "video",
  99. "binding_key": "video"}}
  100. try:
  101. worker = self.Worker(queues=["video"])
  102. worker.init_queues()
  103. self.assertIn("video", worker.queues)
  104. self.assertNotIn("celery", worker.queues)
  105. c.CELERY_CREATE_MISSING_QUEUES = False
  106. self.assertRaises(ImproperlyConfigured,
  107. self.Worker(queues=["image"]).init_queues)
  108. c.CELERY_CREATE_MISSING_QUEUES = True
  109. worker = self.Worker(queues=["image"])
  110. worker.init_queues()
  111. self.assertIn("image", worker.queues)
  112. finally:
  113. c.CELERY_QUEUES = p
  114. @disable_stdouts
  115. def test_on_listener_ready(self):
  116. worker_ready_sent = [False]
  117. def on_worker_ready(**kwargs):
  118. worker_ready_sent[0] = True
  119. signals.worker_ready.connect(on_worker_ready)
  120. self.Worker().on_listener_ready(object())
  121. self.assertTrue(worker_ready_sent[0])
  122. class test_funs(unittest.TestCase):
  123. @disable_stdouts
  124. def test_set_process_status(self):
  125. prev1, sys.argv = sys.argv, ["Arg0"]
  126. try:
  127. st = cd.set_process_status("Running")
  128. self.assertIn("celeryd", st)
  129. self.assertIn("Running", st)
  130. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  131. try:
  132. st = cd.set_process_status("Running")
  133. self.assertIn("celeryd", st)
  134. self.assertIn("Running", st)
  135. self.assertIn("Arg1", st)
  136. finally:
  137. sys.argv = prev2
  138. finally:
  139. sys.argv = prev1
  140. @disable_stdouts
  141. def test_parse_options(self):
  142. cmd = WorkerCommand()
  143. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  144. self.assertEqual(opts.concurrency, 512)
  145. @disable_stdouts
  146. def test_run_worker(self):
  147. p, cd.Worker = cd.Worker, Worker
  148. try:
  149. cd.run_worker(discard=True)
  150. finally:
  151. cd.Worker = p
  152. @disable_stdouts
  153. def test_main(self):
  154. p, cd.Worker = cd.Worker, Worker
  155. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  156. try:
  157. celeryd_main()
  158. finally:
  159. cd.Worker = p
  160. sys.argv = s
  161. class test_signal_handlers(unittest.TestCase):
  162. class _Worker(object):
  163. stopped = False
  164. terminated = False
  165. logger = get_logger()
  166. def stop(self):
  167. self.stopped = True
  168. def terminate(self):
  169. self.terminated = True
  170. def psig(self, fun, *args, **kwargs):
  171. handlers = {}
  172. def i(sig, handler):
  173. handlers[sig] = handler
  174. p, platform.install_signal_handler = platform.install_signal_handler, i
  175. try:
  176. fun(*args, **kwargs)
  177. return handlers
  178. finally:
  179. platform.install_signal_handler = p
  180. @disable_stdouts
  181. def test_worker_int_handler(self):
  182. worker = self._Worker()
  183. handlers = self.psig(cd.install_worker_int_handler, worker)
  184. next_handlers = {}
  185. def i(sig, handler):
  186. next_handlers[sig] = handler
  187. p = platform.install_signal_handler
  188. platform.install_signal_handler = i
  189. try:
  190. self.assertRaises(SystemExit, handlers["SIGINT"],
  191. "SIGINT", object())
  192. self.assertTrue(worker.stopped)
  193. finally:
  194. platform.install_signal_handler = p
  195. self.assertRaises(SystemExit, next_handlers["SIGINT"],
  196. "SIGINT", object())
  197. self.assertTrue(worker.terminated)
  198. @disable_stdouts
  199. def test_worker_int_handler_only_stop_MainProcess(self):
  200. process = current_process()
  201. name, process.name = process.name, "OtherProcess"
  202. try:
  203. worker = self._Worker()
  204. handlers = self.psig(cd.install_worker_int_handler, worker)
  205. self.assertRaises(SystemExit, handlers["SIGINT"],
  206. "SIGINT", object())
  207. self.assertFalse(worker.stopped)
  208. finally:
  209. process.name = name
  210. @disable_stdouts
  211. def test_worker_int_again_handler_only_stop_MainProcess(self):
  212. process = current_process()
  213. name, process.name = process.name, "OtherProcess"
  214. try:
  215. worker = self._Worker()
  216. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  217. self.assertRaises(SystemExit, handlers["SIGINT"],
  218. "SIGINT", object())
  219. self.assertFalse(worker.terminated)
  220. finally:
  221. process.name = name
  222. @disable_stdouts
  223. def test_worker_term_handler(self):
  224. worker = self._Worker()
  225. handlers = self.psig(cd.install_worker_term_handler, worker)
  226. self.assertRaises(SystemExit, handlers["SIGTERM"],
  227. "SIGTERM", object())
  228. self.assertTrue(worker.stopped)
  229. @disable_stdouts
  230. def test_worker_term_handler_only_stop_MainProcess(self):
  231. process = current_process()
  232. name, process.name = process.name, "OtherProcess"
  233. try:
  234. worker = self._Worker()
  235. handlers = self.psig(cd.install_worker_term_handler, worker)
  236. self.assertRaises(SystemExit, handlers["SIGTERM"],
  237. "SIGTERM", object())
  238. self.assertFalse(worker.stopped)
  239. finally:
  240. process.name = name
  241. @disable_stdouts
  242. def test_worker_restart_handler(self):
  243. argv = []
  244. def _execv(*args):
  245. argv.extend(args)
  246. execv, os.execv = os.execv, _execv
  247. try:
  248. worker = self._Worker()
  249. handlers = self.psig(cd.install_worker_restart_handler, worker)
  250. handlers["SIGHUP"]("SIGHUP", object())
  251. self.assertTrue(worker.stopped)
  252. self.assertTrue(argv)
  253. finally:
  254. os.execv = execv