test_celeryd.py 16 KB


  1. import logging
  2. import os
  3. import sys
  4. import warnings
  5. try:
  6. from multiprocessing import current_process
  7. except ImportError:
  8. current_process = None
  9. from nose import SkipTest
  10. from kombu.tests.utils import redirect_stdouts
  11. from celery import Celery
  12. from celery import platforms
  13. from celery import signals
  14. from celery import current_app
  15. from celery.apps import worker as cd
  16. from celery.bin.celeryd import WorkerCommand, windows_main, \
  17. main as celeryd_main
  18. from celery.exceptions import ImproperlyConfigured
  19. from celery.utils import patch
  20. from celery.utils.functional import wraps
  21. from celery.tests.compat import catch_warnings
  22. from celery.tests.utils import execute_context
  23. from celery.tests.utils import AppCase
  24. from celery.tests.utils import StringIO
  25. patch.ensure_process_aware_logger()
  26. def disable_stdouts(fun):
  27. @wraps(fun)
  28. def disable(*args, **kwargs):
  29. sys.stdout, sys.stderr = StringIO(), StringIO()
  30. try:
  31. return fun(*args, **kwargs)
  32. finally:
  33. sys.stdout = sys.__stdout__
  34. sys.stderr = sys.__stderr__
  35. return disable
  36. class _WorkController(object):
  37. def __init__(self, *args, **kwargs):
  38. self.logger = current_app.log.get_default_logger()
  39. def start(self):
  40. pass
  41. class Worker(cd.Worker):
  42. WorkController = _WorkController
  43. class test_Worker(AppCase):
  44. Worker = Worker
  45. @disable_stdouts
  46. def test_queues_string(self):
  47. celery = Celery(set_as_current=False)
  48. worker = celery.Worker(queues="foo,bar,baz")
  49. worker.init_queues()
  50. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  51. self.assertTrue("foo" in celery.amqp.queues)
  52. @disable_stdouts
  53. def test_loglevel_string(self):
  54. worker = self.Worker(loglevel="INFO")
  55. self.assertEqual(worker.loglevel, logging.INFO)
  56. def test_run_worker(self):
  57. handlers = {}
  58. def i(sig, handler):
  59. handlers[sig] = handler
  60. p = platforms.install_signal_handler
  61. platforms.install_signal_handler = i
  62. try:
  63. w = self.Worker()
  64. w._isatty = False
  65. w.run_worker()
  66. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  67. self.assertIn(sig, handlers)
  68. handlers.clear()
  69. w = self.Worker()
  70. w._isatty = True
  71. w.run_worker()
  72. for sig in "SIGINT", "SIGTERM":
  73. self.assertIn(sig, handlers)
  74. self.assertNotIn("SIGHUP", handlers)
  75. finally:
  76. platforms.install_signal_handler = p
  77. @disable_stdouts
  78. def test_startup_info(self):
  79. worker = self.Worker()
  80. worker.run()
  81. self.assertTrue(worker.startup_info())
  82. worker.loglevel = logging.DEBUG
  83. self.assertTrue(worker.startup_info())
  84. worker.loglevel = logging.INFO
  85. self.assertTrue(worker.startup_info())
  86. @disable_stdouts
  87. def test_run(self):
  88. self.Worker().run()
  89. self.Worker(discard=True).run()
  90. worker = self.Worker()
  91. worker.init_loader()
  92. worker.run()
  93. @disable_stdouts
  94. def test_purge_messages(self):
  95. self.Worker().purge_messages()
  96. @disable_stdouts
  97. def test_init_queues(self):
  98. app = current_app
  99. c = app.conf
  100. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  101. "celery": {"exchange": "celery",
  102. "binding_key": "celery"},
  103. "video": {"exchange": "video",
  104. "binding_key": "video"}})
  105. try:
  106. worker = self.Worker(queues=["video"])
  107. worker.init_queues()
  108. self.assertIn("video", app.amqp.queues)
  109. self.assertIn("video", app.amqp.queues.consume_from)
  110. self.assertIn("celery", app.amqp.queues)
  111. self.assertNotIn("celery", app.amqp.queues.consume_from)
  112. c.CELERY_CREATE_MISSING_QUEUES = False
  113. self.assertRaises(ImproperlyConfigured,
  114. self.Worker(queues=["image"]).init_queues)
  115. c.CELERY_CREATE_MISSING_QUEUES = True
  116. worker = self.Worker(queues=["image"])
  117. worker.init_queues()
  118. self.assertIn("image", app.amqp.queues.consume_from)
  119. self.assertDictContainsSubset({"exchange": "image",
  120. "routing_key": "image",
  121. "binding_key": "image",
  122. "exchange_type": "direct"},
  123. app.amqp.queues["image"])
  124. finally:
  125. app.amqp.queues = p
  126. @disable_stdouts
  127. def test_autoscale_argument(self):
  128. worker1 = self.Worker(autoscale="10,3")
  129. self.assertListEqual(worker1.autoscale, [10, 3])
  130. worker2 = self.Worker(autoscale="10")
  131. self.assertListEqual(worker2.autoscale, [10, 0])
  132. def test_include_argument(self):
  133. worker1 = self.Worker(include="some.module")
  134. self.assertListEqual(worker1.include, ["some.module"])
  135. worker2 = self.Worker(include="some.module,another.package")
  136. self.assertListEqual(worker2.include, ["some.module",
  137. "another.package"])
  138. worker3 = self.Worker(include="os,sys")
  139. worker3.init_loader()
  140. @disable_stdouts
  141. def test_unknown_loglevel(self):
  142. self.assertRaises(SystemExit, self.Worker, loglevel="ALIEN")
  143. worker1 = self.Worker(loglevel=0xFFFF)
  144. self.assertEqual(worker1.loglevel, 0xFFFF)
  145. def test_warns_if_running_as_privileged_user(self):
  146. app = current_app
  147. if app.IS_WINDOWS:
  148. raise SkipTest("Not applicable on Windows")
  149. warnings.resetwarnings()
  150. def geteuid():
  151. return 0
  152. prev, os.geteuid = os.geteuid, geteuid
  153. try:
  154. def with_catch_warnings(log):
  155. worker = self.Worker()
  156. worker.run()
  157. self.assertTrue(log)
  158. self.assertIn("superuser privileges is not encouraged",
  159. log[0].message.args[0])
  160. context = catch_warnings(record=True)
  161. execute_context(context, with_catch_warnings)
  162. finally:
  163. os.geteuid = prev
  164. @disable_stdouts
  165. def test_use_pidfile(self):
  166. from celery import platforms
  167. class create_pidlock(object):
  168. instance = [None]
  169. def __init__(self, file):
  170. self.file = file
  171. self.instance[0] = self
  172. def acquire(self):
  173. self.acquired = True
  174. class Object(object):
  175. def release(self):
  176. pass
  177. return Object()
  178. prev, platforms.create_pidlock = platforms.create_pidlock, \
  179. create_pidlock
  180. try:
  181. worker = self.Worker(pidfile="pidfilelockfilepid")
  182. worker.run_worker()
  183. self.assertTrue(create_pidlock.instance[0].acquired)
  184. finally:
  185. platforms.create_pidlock = prev
  186. @disable_stdouts
  187. def test_redirect_stdouts(self):
  188. worker = self.Worker()
  189. worker.redirect_stdouts = False
  190. worker.redirect_stdouts_to_logger()
  191. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  192. def test_redirect_stdouts_already_handled(self):
  193. logging_setup = [False]
  194. def on_logging_setup(**kwargs):
  195. logging_setup[0] = True
  196. signals.setup_logging.connect(on_logging_setup)
  197. try:
  198. worker = self.Worker()
  199. worker.app.log.__class__._setup = False
  200. worker.redirect_stdouts_to_logger()
  201. self.assertTrue(logging_setup[0])
  202. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  203. finally:
  204. signals.setup_logging.disconnect(on_logging_setup)
  205. @disable_stdouts
  206. def test_platform_tweaks_osx(self):
  207. class OSXWorker(self.Worker):
  208. proxy_workaround_installed = False
  209. def osx_proxy_detection_workaround(self):
  210. self.proxy_workaround_installed = True
  211. worker = OSXWorker()
  212. def install_HUP_nosupport(controller):
  213. controller.hup_not_supported_installed = True
  214. class Controller(object):
  215. logger = logging.getLogger("celery.tests")
  216. prev = cd.install_HUP_not_supported_handler
  217. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  218. try:
  219. worker.app.IS_OSX = True
  220. controller = Controller()
  221. worker.install_platform_tweaks(controller)
  222. self.assertTrue(controller.hup_not_supported_installed)
  223. self.assertTrue(worker.proxy_workaround_installed)
  224. finally:
  225. cd.install_HUP_not_supported_handler = prev
  226. @disable_stdouts
  227. def test_general_platform_tweaks(self):
  228. restart_worker_handler_installed = [False]
  229. def install_worker_restart_handler(worker):
  230. restart_worker_handler_installed[0] = True
  231. class Controller(object):
  232. logger = logging.getLogger("celery.tests")
  233. prev = cd.install_worker_restart_handler
  234. cd.install_worker_restart_handler = install_worker_restart_handler
  235. try:
  236. worker = self.Worker()
  237. worker.app.IS_OSX = False
  238. worker.install_platform_tweaks(Controller())
  239. self.assertTrue(restart_worker_handler_installed[0])
  240. finally:
  241. cd.install_worker_restart_handler = prev
  242. @disable_stdouts
  243. def test_on_consumer_ready(self):
  244. worker_ready_sent = [False]
  245. def on_worker_ready(**kwargs):
  246. worker_ready_sent[0] = True
  247. signals.worker_ready.connect(on_worker_ready)
  248. self.Worker().on_consumer_ready(object())
  249. self.assertTrue(worker_ready_sent[0])
  250. class test_funs(AppCase):
  251. @redirect_stdouts
  252. def test_windows_main(self, stdout, stderr):
  253. windows_main()
  254. self.assertIn("celeryd command does not work on Windows",
  255. stderr.getvalue())
  256. @disable_stdouts
  257. def test_set_process_status(self):
  258. try:
  259. __import__("setproctitle")
  260. except ImportError:
  261. raise SkipTest("setproctitle not installed")
  262. worker = Worker(hostname="xyzza")
  263. prev1, sys.argv = sys.argv, ["Arg0"]
  264. try:
  265. st = worker.set_process_status("Running")
  266. self.assertIn("celeryd", st)
  267. self.assertIn("xyzza", st)
  268. self.assertIn("Running", st)
  269. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  270. try:
  271. st = worker.set_process_status("Running")
  272. self.assertIn("celeryd", st)
  273. self.assertIn("xyzza", st)
  274. self.assertIn("Running", st)
  275. self.assertIn("Arg1", st)
  276. finally:
  277. sys.argv = prev2
  278. finally:
  279. sys.argv = prev1
  280. @disable_stdouts
  281. def test_parse_options(self):
  282. cmd = WorkerCommand()
  283. cmd.app = current_app
  284. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  285. self.assertEqual(opts.concurrency, 512)
  286. @disable_stdouts
  287. def test_main(self):
  288. p, cd.Worker = cd.Worker, Worker
  289. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  290. try:
  291. celeryd_main()
  292. finally:
  293. cd.Worker = p
  294. sys.argv = s
  295. class test_signal_handlers(AppCase):
  296. class _Worker(object):
  297. stopped = False
  298. terminated = False
  299. logger = current_app.log.get_default_logger()
  300. def stop(self, in_sighandler=False):
  301. self.stopped = True
  302. def terminate(self, in_sighandler=False):
  303. self.terminated = True
  304. def psig(self, fun, *args, **kwargs):
  305. handlers = {}
  306. def i(sig, handler):
  307. handlers[sig] = handler
  308. p, platforms.install_signal_handler = \
  309. platforms.install_signal_handler, i
  310. try:
  311. fun(*args, **kwargs)
  312. return handlers
  313. finally:
  314. platforms.install_signal_handler = p
  315. @disable_stdouts
  316. def test_worker_int_handler(self):
  317. worker = self._Worker()
  318. handlers = self.psig(cd.install_worker_int_handler, worker)
  319. next_handlers = {}
  320. def i(sig, handler):
  321. next_handlers[sig] = handler
  322. p = platforms.install_signal_handler
  323. platforms.install_signal_handler = i
  324. try:
  325. self.assertRaises(SystemExit, handlers["SIGINT"],
  326. "SIGINT", object())
  327. self.assertTrue(worker.stopped)
  328. finally:
  329. platforms.install_signal_handler = p
  330. self.assertRaises(SystemExit, next_handlers["SIGINT"],
  331. "SIGINT", object())
  332. self.assertTrue(worker.terminated)
  333. @disable_stdouts
  334. def test_worker_int_handler_only_stop_MainProcess(self):
  335. if current_process is None:
  336. raise SkipTest("only relevant for multiprocessing")
  337. process = current_process()
  338. name, process.name = process.name, "OtherProcess"
  339. try:
  340. worker = self._Worker()
  341. handlers = self.psig(cd.install_worker_int_handler, worker)
  342. self.assertRaises(SystemExit, handlers["SIGINT"],
  343. "SIGINT", object())
  344. self.assertFalse(worker.stopped)
  345. finally:
  346. process.name = name
  347. @disable_stdouts
  348. def test_install_HUP_not_supported_handler(self):
  349. worker = self._Worker()
  350. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  351. handlers["SIGHUP"]("SIGHUP", object())
  352. @disable_stdouts
  353. def test_worker_int_again_handler_only_stop_MainProcess(self):
  354. if current_process is None:
  355. raise SkipTest("only relevant for multiprocessing")
  356. process = current_process()
  357. name, process.name = process.name, "OtherProcess"
  358. try:
  359. worker = self._Worker()
  360. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  361. self.assertRaises(SystemExit, handlers["SIGINT"],
  362. "SIGINT", object())
  363. self.assertFalse(worker.terminated)
  364. finally:
  365. process.name = name
  366. @disable_stdouts
  367. def test_worker_term_handler(self):
  368. worker = self._Worker()
  369. handlers = self.psig(cd.install_worker_term_handler, worker)
  370. self.assertRaises(SystemExit, handlers["SIGTERM"],
  371. "SIGTERM", object())
  372. self.assertTrue(worker.stopped)
  373. def test_worker_cry_handler(self):
  374. if sys.platform.startswith("java"):
  375. raise SkipTest("Cry handler does not work on Jython")
  376. if hasattr(sys, "pypy_version_info"):
  377. raise SkipTest("Cry handler does not work on PyPy")
  378. if sys.version_info > (2, 5):
  379. class Logger(object):
  380. _errors = []
  381. def error(self, msg, *args, **kwargs):
  382. self._errors.append(msg)
  383. logger = Logger()
  384. handlers = self.psig(cd.install_cry_handler, logger)
  385. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  386. self.assertTrue(Logger._errors)
  387. else:
  388. raise SkipTest("Needs Python 2.5 or later")
  389. @disable_stdouts
  390. def test_worker_term_handler_only_stop_MainProcess(self):
  391. if current_process is None:
  392. raise SkipTest("only relevant for multiprocessing")
  393. process = current_process()
  394. name, process.name = process.name, "OtherProcess"
  395. try:
  396. worker = self._Worker()
  397. handlers = self.psig(cd.install_worker_term_handler, worker)
  398. self.assertRaises(SystemExit, handlers["SIGTERM"],
  399. "SIGTERM", object())
  400. self.assertFalse(worker.stopped)
  401. finally:
  402. process.name = name
  403. @disable_stdouts
  404. def test_worker_restart_handler(self):
  405. if getattr(os, "execv", None) is None:
  406. raise SkipTest("platform does not have excv")
  407. argv = []
  408. def _execv(*args):
  409. argv.extend(args)
  410. execv, os.execv = os.execv, _execv
  411. try:
  412. worker = self._Worker()
  413. handlers = self.psig(cd.install_worker_restart_handler, worker)
  414. handlers["SIGHUP"]("SIGHUP", object())
  415. self.assertTrue(worker.stopped)
  416. self.assertTrue(argv)
  417. finally:
  418. os.execv = execv