test_celeryd.py 15 KB


  1. import logging
  2. import os
  3. import sys
  4. from multiprocessing import get_logger, current_process
  5. from kombu.tests.utils import redirect_stdouts
  6. from celery import Celery
  7. from celery import platforms
  8. from celery import signals
  9. from celery.app import app_or_default
  10. from celery.apps import worker as cd
  11. from celery.bin.celeryd import WorkerCommand, windows_main, \
  12. main as celeryd_main
  13. from celery.exceptions import ImproperlyConfigured
  14. from celery.utils import patch
  15. from celery.utils.functional import wraps
  16. from celery.tests.compat import catch_warnings
  17. from celery.tests.utils import execute_context
  18. from celery.tests.utils import unittest
  19. from celery.tests.utils import StringIO
  20. patch.ensure_process_aware_logger()
  21. def disable_stdouts(fun):
  22. @wraps(fun)
  23. def disable(*args, **kwargs):
  24. sys.stdout, sys.stderr = StringIO(), StringIO()
  25. try:
  26. return fun(*args, **kwargs)
  27. finally:
  28. sys.stdout = sys.__stdout__
  29. sys.stderr = sys.__stderr__
  30. return disable
  31. class _WorkController(object):
  32. def __init__(self, *args, **kwargs):
  33. self.logger = app_or_default().log.get_default_logger()
  34. def start(self):
  35. pass
  36. class Worker(cd.Worker):
  37. WorkController = _WorkController
  38. class test_Worker(unittest.TestCase):
  39. Worker = Worker
  40. @disable_stdouts
  41. def test_queues_string(self):
  42. celery = Celery(set_as_current=False)
  43. worker = celery.Worker(queues="foo,bar,baz")
  44. worker.init_queues()
  45. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  46. self.assertTrue("foo" in celery.amqp.queues)
  47. @disable_stdouts
  48. def test_loglevel_string(self):
  49. worker = self.Worker(loglevel="INFO")
  50. self.assertEqual(worker.loglevel, logging.INFO)
  51. def test_run_worker(self):
  52. handlers = {}
  53. def i(sig, handler):
  54. handlers[sig] = handler
  55. p = platforms.install_signal_handler
  56. platforms.install_signal_handler = i
  57. try:
  58. w = self.Worker()
  59. w._isatty = False
  60. w.run_worker()
  61. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  62. self.assertIn(sig, handlers)
  63. handlers.clear()
  64. w = self.Worker()
  65. w._isatty = True
  66. w.run_worker()
  67. for sig in "SIGINT", "SIGTERM":
  68. self.assertIn(sig, handlers)
  69. self.assertNotIn("SIGHUP", handlers)
  70. finally:
  71. platforms.install_signal_handler = p
  72. @disable_stdouts
  73. def test_startup_info(self):
  74. worker = self.Worker()
  75. worker.run()
  76. self.assertTrue(worker.startup_info())
  77. worker.loglevel = logging.DEBUG
  78. self.assertTrue(worker.startup_info())
  79. worker.loglevel = logging.INFO
  80. self.assertTrue(worker.startup_info())
  81. @disable_stdouts
  82. def test_run(self):
  83. self.Worker().run()
  84. self.Worker(discard=True).run()
  85. worker = self.Worker()
  86. worker.init_loader()
  87. worker.settings.DEBUG = True
  88. def with_catch_warnings(log):
  89. worker.run()
  90. self.assertIn("memory leak", log[0].message.args[0])
  91. context = catch_warnings(record=True)
  92. execute_context(context, with_catch_warnings)
  93. worker.settings.DEBUG = False
  94. @disable_stdouts
  95. def test_purge_messages(self):
  96. self.Worker().purge_messages()
  97. @disable_stdouts
  98. def test_init_queues(self):
  99. app = app_or_default()
  100. c = app.conf
  101. p, app.amqp.queues = app.amqp.queues, {
  102. "celery": {"exchange": "celery",
  103. "binding_key": "celery"},
  104. "video": {"exchange": "video",
  105. "binding_key": "video"}}
  106. try:
  107. worker = self.Worker(queues=["video"])
  108. worker.init_queues()
  109. self.assertIn("video", worker.queues)
  110. self.assertNotIn("celery", worker.queues)
  111. c.CELERY_CREATE_MISSING_QUEUES = False
  112. self.assertRaises(ImproperlyConfigured,
  113. self.Worker(queues=["image"]).init_queues)
  114. c.CELERY_CREATE_MISSING_QUEUES = True
  115. worker = self.Worker(queues=["image"])
  116. worker.init_queues()
  117. self.assertIn("image", worker.queues)
  118. self.assertDictContainsSubset({"exchange": "image",
  119. "routing_key": "image",
  120. "binding_key": "image",
  121. "exchange_type": "direct"},
  122. worker.queues["image"])
  123. finally:
  124. app.amqp.queues = p
  125. @disable_stdouts
  126. def test_autoscale_argument(self):
  127. worker1 = self.Worker(autoscale="10,3")
  128. self.assertListEqual(worker1.autoscale, [10, 3])
  129. worker2 = self.Worker(autoscale="10")
  130. self.assertListEqual(worker2.autoscale, [10, 0])
  131. @disable_stdouts
  132. def test_include_argument(self):
  133. worker1 = self.Worker(include="some.module")
  134. self.assertListEqual(worker1.include, ["some.module"])
  135. worker2 = self.Worker(include="some.module,another.package")
  136. self.assertListEqual(worker2.include, ["some.module",
  137. "another.package"])
  138. worker3 = self.Worker(include="os,sys")
  139. worker3.init_loader()
  140. @disable_stdouts
  141. def test_unknown_loglevel(self):
  142. self.assertRaises(SystemExit, self.Worker, loglevel="ALIEN")
  143. worker1 = self.Worker(loglevel=0xFFFF)
  144. self.assertEqual(worker1.loglevel, 0xFFFF)
  145. @disable_stdouts
  146. def test_warns_if_running_as_privileged_user(self):
  147. def geteuid():
  148. return 0
  149. prev, os.geteuid = os.geteuid, geteuid
  150. try:
  151. def with_catch_warnings(log):
  152. worker = self.Worker()
  153. worker.run()
  154. self.assertTrue(log)
  155. self.assertIn("supervisor privileges is not encouraged",
  156. log[0].message.args[0])
  157. context = catch_warnings(record=True)
  158. execute_context(context, with_catch_warnings)
  159. finally:
  160. os.geteuid = prev
  161. @disable_stdouts
  162. def test_use_pidfile(self):
  163. from celery import platforms
  164. class create_pidlock(object):
  165. instance = [None]
  166. def __init__(self, file):
  167. self.file = file
  168. self.instance[0] = self
  169. def acquire(self):
  170. self.acquired = True
  171. class Object(object):
  172. def release(self):
  173. pass
  174. return Object()
  175. prev, platforms.create_pidlock = platforms.create_pidlock, \
  176. create_pidlock
  177. try:
  178. worker = self.Worker(pidfile="pidfilelockfilepid")
  179. worker.run_worker()
  180. self.assertTrue(create_pidlock.instance[0].acquired)
  181. finally:
  182. platforms.create_pidlock = prev
  183. @disable_stdouts
  184. def test_redirect_stdouts(self):
  185. worker = self.Worker()
  186. worker.redirect_stdouts = False
  187. worker.redirect_stdouts_to_logger()
  188. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  189. def test_redirect_stdouts_already_handled(self):
  190. from celery import signals
  191. logging_setup = [False]
  192. def on_logging_setup(**kwargs):
  193. logging_setup[0] = True
  194. signals.setup_logging.connect(on_logging_setup)
  195. try:
  196. worker = self.Worker()
  197. worker.app.log.__class__._setup = False
  198. worker.redirect_stdouts_to_logger()
  199. self.assertTrue(logging_setup[0])
  200. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  201. finally:
  202. signals.setup_logging.disconnect(on_logging_setup)
  203. @disable_stdouts
  204. def test_platform_tweaks_osx(self):
  205. class OSXWorker(self.Worker):
  206. proxy_workaround_installed = False
  207. def osx_proxy_detection_workaround(self):
  208. self.proxy_workaround_installed = True
  209. worker = OSXWorker()
  210. def install_HUP_nosupport(controller):
  211. controller.hup_not_supported_installed = True
  212. class Controller(object):
  213. logger = logging.getLogger("celery.tests")
  214. prev = cd.install_HUP_not_supported_handler
  215. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  216. try:
  217. worker.app.IS_OSX = True
  218. controller = Controller()
  219. worker.install_platform_tweaks(controller)
  220. self.assertTrue(controller.hup_not_supported_installed)
  221. self.assertTrue(worker.proxy_workaround_installed)
  222. finally:
  223. cd.install_HUP_not_supported_handler = prev
  224. @disable_stdouts
  225. def test_general_platform_tweaks(self):
  226. restart_worker_handler_installed = [False]
  227. def install_worker_restart_handler(worker):
  228. restart_worker_handler_installed[0] = True
  229. class Controller(object):
  230. logger = logging.getLogger("celery.tests")
  231. prev = cd.install_worker_restart_handler
  232. cd.install_worker_restart_handler = install_worker_restart_handler
  233. try:
  234. worker = self.Worker()
  235. worker.app.IS_OSX = False
  236. worker.install_platform_tweaks(Controller())
  237. self.assertTrue(restart_worker_handler_installed[0])
  238. finally:
  239. cd.install_worker_restart_handler = prev
  240. @disable_stdouts
  241. def test_on_consumer_ready(self):
  242. worker_ready_sent = [False]
  243. def on_worker_ready(**kwargs):
  244. worker_ready_sent[0] = True
  245. signals.worker_ready.connect(on_worker_ready)
  246. self.Worker().on_consumer_ready(object())
  247. self.assertTrue(worker_ready_sent[0])
  248. class test_funs(unittest.TestCase):
  249. @redirect_stdouts
  250. def test_windows_main(self, stdout, stderr):
  251. windows_main()
  252. self.assertIn("celeryd command does not work on Windows",
  253. stderr.getvalue())
  254. @disable_stdouts
  255. def test_set_process_status(self):
  256. worker = Worker(hostname="xyzza")
  257. prev1, sys.argv = sys.argv, ["Arg0"]
  258. try:
  259. st = worker.set_process_status("Running")
  260. self.assertIn("celeryd", st)
  261. self.assertIn("xyzza", st)
  262. self.assertIn("Running", st)
  263. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  264. try:
  265. st = worker.set_process_status("Running")
  266. self.assertIn("celeryd", st)
  267. self.assertIn("xyzza", st)
  268. self.assertIn("Running", st)
  269. self.assertIn("Arg1", st)
  270. finally:
  271. sys.argv = prev2
  272. finally:
  273. sys.argv = prev1
  274. @disable_stdouts
  275. def test_parse_options(self):
  276. cmd = WorkerCommand()
  277. cmd.app = app_or_default()
  278. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  279. self.assertEqual(opts.concurrency, 512)
  280. @disable_stdouts
  281. def test_main(self):
  282. p, cd.Worker = cd.Worker, Worker
  283. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  284. try:
  285. celeryd_main()
  286. finally:
  287. cd.Worker = p
  288. sys.argv = s
  289. class test_signal_handlers(unittest.TestCase):
  290. class _Worker(object):
  291. stopped = False
  292. terminated = False
  293. logger = get_logger()
  294. def stop(self, in_sighandler=False):
  295. self.stopped = True
  296. def terminate(self, in_sighandler=False):
  297. self.terminated = True
  298. def psig(self, fun, *args, **kwargs):
  299. handlers = {}
  300. def i(sig, handler):
  301. handlers[sig] = handler
  302. p, platforms.install_signal_handler = \
  303. platforms.install_signal_handler, i
  304. try:
  305. fun(*args, **kwargs)
  306. return handlers
  307. finally:
  308. platforms.install_signal_handler = p
  309. @disable_stdouts
  310. def test_worker_int_handler(self):
  311. worker = self._Worker()
  312. handlers = self.psig(cd.install_worker_int_handler, worker)
  313. next_handlers = {}
  314. def i(sig, handler):
  315. next_handlers[sig] = handler
  316. p = platforms.install_signal_handler
  317. platforms.install_signal_handler = i
  318. try:
  319. self.assertRaises(SystemExit, handlers["SIGINT"],
  320. "SIGINT", object())
  321. self.assertTrue(worker.stopped)
  322. finally:
  323. platforms.install_signal_handler = p
  324. self.assertRaises(SystemExit, next_handlers["SIGINT"],
  325. "SIGINT", object())
  326. self.assertTrue(worker.terminated)
  327. @disable_stdouts
  328. def test_worker_int_handler_only_stop_MainProcess(self):
  329. process = current_process()
  330. name, process.name = process.name, "OtherProcess"
  331. try:
  332. worker = self._Worker()
  333. handlers = self.psig(cd.install_worker_int_handler, worker)
  334. self.assertRaises(SystemExit, handlers["SIGINT"],
  335. "SIGINT", object())
  336. self.assertFalse(worker.stopped)
  337. finally:
  338. process.name = name
  339. @disable_stdouts
  340. def test_install_HUP_not_supported_handler(self):
  341. worker = self._Worker()
  342. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  343. handlers["SIGHUP"]("SIGHUP", object())
  344. @disable_stdouts
  345. def test_worker_int_again_handler_only_stop_MainProcess(self):
  346. process = current_process()
  347. name, process.name = process.name, "OtherProcess"
  348. try:
  349. worker = self._Worker()
  350. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  351. self.assertRaises(SystemExit, handlers["SIGINT"],
  352. "SIGINT", object())
  353. self.assertFalse(worker.terminated)
  354. finally:
  355. process.name = name
  356. @disable_stdouts
  357. def test_worker_term_handler(self):
  358. worker = self._Worker()
  359. handlers = self.psig(cd.install_worker_term_handler, worker)
  360. self.assertRaises(SystemExit, handlers["SIGTERM"],
  361. "SIGTERM", object())
  362. self.assertTrue(worker.stopped)
  363. def test_worker_cry_handler(self):
  364. class Logger(object):
  365. _errors = []
  366. def error(self, msg, *args, **kwargs):
  367. self._errors.append(msg)
  368. logger = Logger()
  369. handlers = self.psig(cd.install_cry_handler, logger)
  370. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  371. self.assertTrue(Logger._errors)
  372. @disable_stdouts
  373. def test_worker_term_handler_only_stop_MainProcess(self):
  374. process = current_process()
  375. name, process.name = process.name, "OtherProcess"
  376. try:
  377. worker = self._Worker()
  378. handlers = self.psig(cd.install_worker_term_handler, worker)
  379. self.assertRaises(SystemExit, handlers["SIGTERM"],
  380. "SIGTERM", object())
  381. self.assertFalse(worker.stopped)
  382. finally:
  383. process.name = name
  384. @disable_stdouts
  385. def test_worker_restart_handler(self):
  386. argv = []
  387. def _execv(*args):
  388. argv.extend(args)
  389. execv, os.execv = os.execv, _execv
  390. try:
  391. worker = self._Worker()
  392. handlers = self.psig(cd.install_worker_restart_handler, worker)
  393. handlers["SIGHUP"]("SIGHUP", object())
  394. self.assertTrue(worker.stopped)
  395. self.assertTrue(argv)
  396. finally:
  397. os.execv = execv