test_celeryd.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import logging
  2. import os
  3. import sys
  4. import unittest2 as unittest
  5. from multiprocessing import get_logger, current_process
  6. from StringIO import StringIO
  7. from celery import conf
  8. from celery import platforms
  9. from celery import signals
  10. from celery import log
  11. from celery.apps import worker as cd
  12. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  13. from celery.exceptions import ImproperlyConfigured
  14. from celery.utils import patch
  15. from celery.utils.functional import wraps
  16. from celery.tests.compat import catch_warnings
  17. from celery.tests.utils import execute_context
  18. patch.ensure_process_aware_logger()
  19. def disable_stdouts(fun):
  20. @wraps(fun)
  21. def disable(*args, **kwargs):
  22. sys.stdout, sys.stderr = StringIO(), StringIO()
  23. try:
  24. return fun(*args, **kwargs)
  25. finally:
  26. sys.stdout = sys.__stdout__
  27. sys.stderr = sys.__stderr__
  28. return disable
  29. class _WorkController(object):
  30. def __init__(self, *args, **kwargs):
  31. self.logger = log.get_default_logger()
  32. def start(self):
  33. pass
  34. class Worker(cd.Worker):
  35. WorkController = _WorkController
  36. class test_Worker(unittest.TestCase):
  37. Worker = Worker
  38. @disable_stdouts
  39. def test_queues_string(self):
  40. worker = self.Worker(queues="foo,bar,baz")
  41. self.assertEqual(worker.queues, ["foo", "bar", "baz"])
  42. @disable_stdouts
  43. def test_loglevel_string(self):
  44. worker = self.Worker(loglevel="INFO")
  45. self.assertEqual(worker.loglevel, logging.INFO)
  46. def test_run_worker(self):
  47. handlers = {}
  48. def i(sig, handler):
  49. handlers[sig] = handler
  50. p = platforms.install_signal_handler
  51. platforms.install_signal_handler = i
  52. try:
  53. w = self.Worker()
  54. w._isatty = False
  55. w.run_worker()
  56. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  57. self.assertIn(sig, handlers)
  58. handlers.clear()
  59. w = self.Worker()
  60. w._isatty = True
  61. w.run_worker()
  62. for sig in "SIGINT", "SIGTERM":
  63. self.assertIn(sig, handlers)
  64. self.assertNotIn("SIGHUP", handlers)
  65. finally:
  66. platforms.install_signal_handler = p
  67. @disable_stdouts
  68. def test_startup_info(self):
  69. worker = self.Worker()
  70. worker.run()
  71. self.assertTrue(worker.startup_info())
  72. worker.loglevel = logging.DEBUG
  73. self.assertTrue(worker.startup_info())
  74. worker.loglevel = logging.INFO
  75. self.assertTrue(worker.startup_info())
  76. @disable_stdouts
  77. def test_run(self):
  78. self.Worker().run()
  79. self.Worker(discard=True).run()
  80. worker = self.Worker()
  81. worker.init_loader()
  82. worker.settings.DEBUG = True
  83. def with_catch_warnings(log):
  84. worker.run()
  85. self.assertIn("memory leak", log[0].message.args[0])
  86. context = catch_warnings(record=True)
  87. execute_context(context, with_catch_warnings)
  88. worker.settings.DEBUG = False
  89. @disable_stdouts
  90. def test_purge_messages(self):
  91. self.Worker().purge_messages()
  92. @disable_stdouts
  93. def test_init_queues(self):
  94. p, conf.QUEUES = conf.QUEUES, {
  95. "celery": {"exchange": "celery",
  96. "binding_key": "celery"},
  97. "video": {"exchange": "video",
  98. "binding_key": "video"}}
  99. try:
  100. self.Worker(queues=["video"]).init_queues()
  101. self.assertIn("video", conf.QUEUES)
  102. self.assertNotIn("celery", conf.QUEUES)
  103. conf.CREATE_MISSING_QUEUES = False
  104. self.assertRaises(ImproperlyConfigured,
  105. self.Worker(queues=["image"]).init_queues)
  106. conf.CREATE_MISSING_QUEUES = True
  107. self.Worker(queues=["image"]).init_queues()
  108. self.assertIn("image", conf.QUEUES)
  109. finally:
  110. conf.QUEUES = p
  111. @disable_stdouts
  112. def test_on_listener_ready(self):
  113. worker_ready_sent = [False]
  114. def on_worker_ready(**kwargs):
  115. worker_ready_sent[0] = True
  116. signals.worker_ready.connect(on_worker_ready)
  117. self.Worker().on_listener_ready(object())
  118. self.assertTrue(worker_ready_sent[0])
  119. class test_funs(unittest.TestCase):
  120. @disable_stdouts
  121. def test_set_process_status(self):
  122. worker = Worker(hostname="xyzza")
  123. prev1, sys.argv = sys.argv, ["Arg0"]
  124. try:
  125. st = worker.set_process_status("Running")
  126. self.assertIn("celeryd", st)
  127. self.assertIn("xyzza", st)
  128. self.assertIn("Running", st)
  129. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  130. try:
  131. st = worker.set_process_status("Running")
  132. self.assertIn("celeryd", st)
  133. self.assertIn("xyzza", st)
  134. self.assertIn("Running", st)
  135. self.assertIn("Arg1", st)
  136. finally:
  137. sys.argv = prev2
  138. finally:
  139. sys.argv = prev1
  140. @disable_stdouts
  141. def test_parse_options(self):
  142. cmd = WorkerCommand()
  143. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  144. self.assertEqual(opts.concurrency, 512)
  145. @disable_stdouts
  146. def test_run_worker(self):
  147. p, cd.Worker = cd.Worker, Worker
  148. try:
  149. cd.run_worker(discard=True)
  150. finally:
  151. cd.Worker = p
  152. @disable_stdouts
  153. def test_main(self):
  154. p, cd.Worker = cd.Worker, Worker
  155. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  156. try:
  157. celeryd_main()
  158. finally:
  159. cd.Worker = p
  160. sys.argv = s
  161. class test_signal_handlers(unittest.TestCase):
  162. class _Worker(object):
  163. stopped = False
  164. terminated = False
  165. logger = get_logger()
  166. def stop(self):
  167. self.stopped = True
  168. def terminate(self):
  169. self.terminated = True
  170. def psig(self, fun, *args, **kwargs):
  171. handlers = {}
  172. def i(sig, handler):
  173. handlers[sig] = handler
  174. p, platforms.install_signal_handler = \
  175. platforms.install_signal_handler, i
  176. try:
  177. fun(*args, **kwargs)
  178. return handlers
  179. finally:
  180. platforms.install_signal_handler = p
  181. @disable_stdouts
  182. def test_worker_int_handler(self):
  183. worker = self._Worker()
  184. handlers = self.psig(cd.install_worker_int_handler, worker)
  185. next_handlers = {}
  186. def i(sig, handler):
  187. next_handlers[sig] = handler
  188. p = platforms.install_signal_handler
  189. platforms.install_signal_handler = i
  190. try:
  191. self.assertRaises(SystemExit, handlers["SIGINT"],
  192. "SIGINT", object())
  193. self.assertTrue(worker.stopped)
  194. finally:
  195. platforms.install_signal_handler = p
  196. self.assertRaises(SystemExit, next_handlers["SIGINT"],
  197. "SIGINT", object())
  198. self.assertTrue(worker.terminated)
  199. @disable_stdouts
  200. def test_worker_int_handler_only_stop_MainProcess(self):
  201. process = current_process()
  202. name, process.name = process.name, "OtherProcess"
  203. try:
  204. worker = self._Worker()
  205. handlers = self.psig(cd.install_worker_int_handler, worker)
  206. self.assertRaises(SystemExit, handlers["SIGINT"],
  207. "SIGINT", object())
  208. self.assertFalse(worker.stopped)
  209. finally:
  210. process.name = name
  211. @disable_stdouts
  212. def test_worker_int_again_handler_only_stop_MainProcess(self):
  213. process = current_process()
  214. name, process.name = process.name, "OtherProcess"
  215. try:
  216. worker = self._Worker()
  217. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  218. self.assertRaises(SystemExit, handlers["SIGINT"],
  219. "SIGINT", object())
  220. self.assertFalse(worker.terminated)
  221. finally:
  222. process.name = name
  223. @disable_stdouts
  224. def test_worker_term_handler(self):
  225. worker = self._Worker()
  226. handlers = self.psig(cd.install_worker_term_handler, worker)
  227. self.assertRaises(SystemExit, handlers["SIGTERM"],
  228. "SIGTERM", object())
  229. self.assertTrue(worker.stopped)
  230. @disable_stdouts
  231. def test_worker_term_handler_only_stop_MainProcess(self):
  232. process = current_process()
  233. name, process.name = process.name, "OtherProcess"
  234. try:
  235. worker = self._Worker()
  236. handlers = self.psig(cd.install_worker_term_handler, worker)
  237. self.assertRaises(SystemExit, handlers["SIGTERM"],
  238. "SIGTERM", object())
  239. self.assertFalse(worker.stopped)
  240. finally:
  241. process.name = name
  242. @disable_stdouts
  243. def test_worker_restart_handler(self):
  244. argv = []
  245. def _execv(*args):
  246. argv.extend(args)
  247. execv, os.execv = os.execv, _execv
  248. try:
  249. worker = self._Worker()
  250. handlers = self.psig(cd.install_worker_restart_handler, worker)
  251. handlers["SIGHUP"]("SIGHUP", object())
  252. self.assertTrue(worker.stopped)
  253. self.assertTrue(argv)
  254. finally:
  255. os.execv = execv