test_celeryd.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import logging
  4. import os
  5. import sys
  6. from functools import wraps
  7. from mock import patch
  8. from nose import SkipTest
  9. from celery import Celery
  10. from celery import platforms
  11. from celery import signals
  12. from celery import current_app
  13. from celery.apps import worker as cd
  14. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  15. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  16. from celery.tests.utils import (AppCase, WhateverIO, mask_modules,
  17. reset_modules, skip_unless_module)
  18. from celery.utils.mp import current_process
  19. from celery.utils.log import ensure_process_aware_logger
  20. ensure_process_aware_logger()
  21. def disable_stdouts(fun):
  22. @wraps(fun)
  23. def disable(*args, **kwargs):
  24. sys.stdout, sys.stderr = WhateverIO(), WhateverIO()
  25. try:
  26. return fun(*args, **kwargs)
  27. finally:
  28. sys.stdout = sys.__stdout__
  29. sys.stderr = sys.__stderr__
  30. return disable
  31. class _WorkController(object):
  32. def __init__(self, *args, **kwargs):
  33. pass
  34. def start(self):
  35. pass
  36. class Worker(cd.Worker):
  37. WorkController = _WorkController
  38. class test_Worker(AppCase):
  39. Worker = Worker
  40. @disable_stdouts
  41. def test_queues_string(self):
  42. celery = Celery(set_as_current=False)
  43. worker = celery.Worker(queues="foo,bar,baz")
  44. worker.init_queues()
  45. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  46. self.assertTrue("foo" in celery.amqp.queues)
  47. @disable_stdouts
  48. def test_windows_B_option(self):
  49. celery = Celery(set_as_current=False)
  50. celery.IS_WINDOWS = True
  51. with self.assertRaises(SystemExit):
  52. celery.Worker(embed_clockservice=True)
  53. def test_tasklist(self):
  54. celery = Celery(set_as_current=False)
  55. worker = celery.Worker()
  56. self.assertTrue(worker.app.tasks)
  57. self.assertTrue(worker.app.finalized)
  58. self.assertTrue(worker.tasklist(include_builtins=True))
  59. worker.tasklist(include_builtins=False)
  60. def test_extra_info(self):
  61. celery = Celery(set_as_current=False)
  62. worker = celery.Worker()
  63. worker.loglevel = logging.WARNING
  64. self.assertFalse(worker.extra_info())
  65. worker.loglevel = logging.INFO
  66. self.assertTrue(worker.extra_info())
  67. @disable_stdouts
  68. def test_loglevel_string(self):
  69. worker = self.Worker(loglevel="INFO")
  70. self.assertEqual(worker.loglevel, logging.INFO)
  71. def test_run_worker(self):
  72. handlers = {}
  73. class Signals(platforms.Signals):
  74. def __setitem__(self, sig, handler):
  75. handlers[sig] = handler
  76. p = platforms.signals
  77. platforms.signals = Signals()
  78. try:
  79. w = self.Worker()
  80. w._isatty = False
  81. w.run_worker()
  82. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  83. self.assertIn(sig, handlers)
  84. handlers.clear()
  85. w = self.Worker()
  86. w._isatty = True
  87. w.run_worker()
  88. for sig in "SIGINT", "SIGTERM":
  89. self.assertIn(sig, handlers)
  90. self.assertNotIn("SIGHUP", handlers)
  91. finally:
  92. platforms.signals = p
  93. @disable_stdouts
  94. def test_startup_info(self):
  95. worker = self.Worker()
  96. worker.run()
  97. self.assertTrue(worker.startup_info())
  98. worker.loglevel = logging.DEBUG
  99. self.assertTrue(worker.startup_info())
  100. worker.loglevel = logging.INFO
  101. self.assertTrue(worker.startup_info())
  102. worker.autoscale = 13, 10
  103. self.assertTrue(worker.startup_info())
  104. @disable_stdouts
  105. def test_run(self):
  106. self.Worker().run()
  107. self.Worker(discard=True).run()
  108. worker = self.Worker()
  109. worker.init_loader()
  110. worker.run()
  111. @disable_stdouts
  112. def test_purge_messages(self):
  113. self.Worker().purge_messages()
  114. @disable_stdouts
  115. def test_init_queues(self):
  116. app = current_app
  117. c = app.conf
  118. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  119. "celery": {"exchange": "celery",
  120. "binding_key": "celery"},
  121. "video": {"exchange": "video",
  122. "binding_key": "video"}})
  123. try:
  124. worker = self.Worker(queues=["video"])
  125. worker.init_queues()
  126. self.assertIn("video", app.amqp.queues)
  127. self.assertIn("video", app.amqp.queues.consume_from)
  128. self.assertIn("celery", app.amqp.queues)
  129. self.assertNotIn("celery", app.amqp.queues.consume_from)
  130. c.CELERY_CREATE_MISSING_QUEUES = False
  131. with self.assertRaises(ImproperlyConfigured):
  132. self.Worker(queues=["image"]).init_queues()
  133. c.CELERY_CREATE_MISSING_QUEUES = True
  134. worker = self.Worker(queues=["image"])
  135. worker.init_queues()
  136. self.assertIn("image", app.amqp.queues.consume_from)
  137. self.assertDictContainsSubset({"exchange": "image",
  138. "routing_key": "image",
  139. "binding_key": "image",
  140. "exchange_type": "direct"},
  141. app.amqp.queues["image"])
  142. finally:
  143. app.amqp.queues = p
  144. @disable_stdouts
  145. def test_autoscale_argument(self):
  146. worker1 = self.Worker(autoscale="10,3")
  147. self.assertListEqual(worker1.autoscale, [10, 3])
  148. worker2 = self.Worker(autoscale="10")
  149. self.assertListEqual(worker2.autoscale, [10, 0])
  150. def test_include_argument(self):
  151. worker1 = self.Worker(include="some.module")
  152. self.assertListEqual(worker1.include, ["some.module"])
  153. worker2 = self.Worker(include="some.module,another.package")
  154. self.assertListEqual(worker2.include, ["some.module",
  155. "another.package"])
  156. worker3 = self.Worker(include="os,sys")
  157. worker3.init_loader()
  158. @disable_stdouts
  159. def test_unknown_loglevel(self):
  160. with self.assertRaises(SystemExit):
  161. self.Worker(loglevel="ALIEN")
  162. worker1 = self.Worker(loglevel=0xFFFF)
  163. self.assertEqual(worker1.loglevel, 0xFFFF)
  164. def test_warns_if_running_as_privileged_user(self):
  165. app = current_app
  166. if app.IS_WINDOWS:
  167. raise SkipTest("Not applicable on Windows")
  168. def getuid():
  169. return 0
  170. prev, os.getuid = os.getuid, getuid
  171. try:
  172. with self.assertWarnsRegex(RuntimeWarning,
  173. r'superuser privileges is discouraged'):
  174. worker = self.Worker()
  175. worker.run()
  176. finally:
  177. os.getuid = prev
  178. @disable_stdouts
  179. def test_use_pidfile(self):
  180. from celery import platforms
  181. class create_pidlock(object):
  182. instance = [None]
  183. def __init__(self, file):
  184. self.file = file
  185. self.instance[0] = self
  186. def acquire(self):
  187. self.acquired = True
  188. class Object(object):
  189. def release(self):
  190. pass
  191. return Object()
  192. prev, platforms.create_pidlock = platforms.create_pidlock, \
  193. create_pidlock
  194. try:
  195. worker = self.Worker(pidfile="pidfilelockfilepid")
  196. worker.run_worker()
  197. self.assertTrue(create_pidlock.instance[0].acquired)
  198. finally:
  199. platforms.create_pidlock = prev
  200. @disable_stdouts
  201. def test_redirect_stdouts(self):
  202. worker = self.Worker()
  203. worker.redirect_stdouts = False
  204. worker.redirect_stdouts_to_logger()
  205. with self.assertRaises(AttributeError):
  206. sys.stdout.logger
  207. def test_redirect_stdouts_already_handled(self):
  208. logging_setup = [False]
  209. @signals.setup_logging.connect
  210. def on_logging_setup(**kwargs):
  211. logging_setup[0] = True
  212. try:
  213. worker = self.Worker()
  214. worker.app.log.__class__._setup = False
  215. worker.redirect_stdouts_to_logger()
  216. self.assertTrue(logging_setup[0])
  217. with self.assertRaises(AttributeError):
  218. sys.stdout.logger
  219. finally:
  220. signals.setup_logging.disconnect(on_logging_setup)
  221. @disable_stdouts
  222. def test_platform_tweaks_osx(self):
  223. class OSXWorker(self.Worker):
  224. proxy_workaround_installed = False
  225. def osx_proxy_detection_workaround(self):
  226. self.proxy_workaround_installed = True
  227. worker = OSXWorker()
  228. def install_HUP_nosupport(controller):
  229. controller.hup_not_supported_installed = True
  230. class Controller(object):
  231. pass
  232. prev = cd.install_HUP_not_supported_handler
  233. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  234. try:
  235. worker.app.IS_OSX = True
  236. controller = Controller()
  237. worker.install_platform_tweaks(controller)
  238. self.assertTrue(controller.hup_not_supported_installed)
  239. self.assertTrue(worker.proxy_workaround_installed)
  240. finally:
  241. cd.install_HUP_not_supported_handler = prev
  242. @disable_stdouts
  243. def test_general_platform_tweaks(self):
  244. restart_worker_handler_installed = [False]
  245. def install_worker_restart_handler(worker):
  246. restart_worker_handler_installed[0] = True
  247. class Controller(object):
  248. pass
  249. prev = cd.install_worker_restart_handler
  250. cd.install_worker_restart_handler = install_worker_restart_handler
  251. try:
  252. worker = self.Worker()
  253. worker.app.IS_OSX = False
  254. worker.install_platform_tweaks(Controller())
  255. self.assertTrue(restart_worker_handler_installed[0])
  256. finally:
  257. cd.install_worker_restart_handler = prev
  258. @disable_stdouts
  259. def test_on_consumer_ready(self):
  260. worker_ready_sent = [False]
  261. @signals.worker_ready.connect
  262. def on_worker_ready(**kwargs):
  263. worker_ready_sent[0] = True
  264. self.Worker().on_consumer_ready(object())
  265. self.assertTrue(worker_ready_sent[0])
  266. class test_funs(AppCase):
  267. @disable_stdouts
  268. def test_set_process_status(self):
  269. try:
  270. __import__("setproctitle")
  271. except ImportError:
  272. raise SkipTest("setproctitle not installed")
  273. worker = Worker(hostname="xyzza")
  274. prev1, sys.argv = sys.argv, ["Arg0"]
  275. try:
  276. st = worker.set_process_status("Running")
  277. self.assertIn("celeryd", st)
  278. self.assertIn("xyzza", st)
  279. self.assertIn("Running", st)
  280. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  281. try:
  282. st = worker.set_process_status("Running")
  283. self.assertIn("celeryd", st)
  284. self.assertIn("xyzza", st)
  285. self.assertIn("Running", st)
  286. self.assertIn("Arg1", st)
  287. finally:
  288. sys.argv = prev2
  289. finally:
  290. sys.argv = prev1
  291. @disable_stdouts
  292. def test_parse_options(self):
  293. cmd = WorkerCommand()
  294. cmd.app = current_app
  295. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  296. self.assertEqual(opts.concurrency, 512)
  297. @disable_stdouts
  298. def test_main(self):
  299. p, cd.Worker = cd.Worker, Worker
  300. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  301. try:
  302. celeryd_main()
  303. finally:
  304. cd.Worker = p
  305. sys.argv = s
  306. class test_signal_handlers(AppCase):
  307. class _Worker(object):
  308. stopped = False
  309. terminated = False
  310. def stop(self, in_sighandler=False):
  311. self.stopped = True
  312. def terminate(self, in_sighandler=False):
  313. self.terminated = True
  314. def psig(self, fun, *args, **kwargs):
  315. handlers = {}
  316. class Signals(platforms.Signals):
  317. def __setitem__(self, sig, handler):
  318. handlers[sig] = handler
  319. p, platforms.signals = platforms.signals, Signals()
  320. try:
  321. fun(*args, **kwargs)
  322. return handlers
  323. finally:
  324. platforms.signals = p
  325. @disable_stdouts
  326. def test_worker_int_handler(self):
  327. worker = self._Worker()
  328. handlers = self.psig(cd.install_worker_int_handler, worker)
  329. next_handlers = {}
  330. class Signals(platforms.Signals):
  331. def __setitem__(self, sig, handler):
  332. next_handlers[sig] = handler
  333. p, platforms.signals = platforms.signals, Signals()
  334. try:
  335. with self.assertRaises(SystemExit):
  336. handlers["SIGINT"]("SIGINT", object())
  337. self.assertTrue(worker.stopped)
  338. finally:
  339. platforms.signals = p
  340. with self.assertRaises(SystemExit):
  341. next_handlers["SIGINT"]("SIGINT", object())
  342. self.assertTrue(worker.terminated)
  343. @disable_stdouts
  344. def test_worker_int_handler_only_stop_MainProcess(self):
  345. if current_process is None:
  346. raise SkipTest("only relevant for multiprocessing")
  347. process = current_process()
  348. name, process.name = process.name, "OtherProcess"
  349. try:
  350. worker = self._Worker()
  351. handlers = self.psig(cd.install_worker_int_handler, worker)
  352. with self.assertRaises(SystemExit):
  353. handlers["SIGINT"]("SIGINT", object())
  354. self.assertFalse(worker.stopped)
  355. finally:
  356. process.name = name
  357. @disable_stdouts
  358. def test_install_HUP_not_supported_handler(self):
  359. worker = self._Worker()
  360. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  361. handlers["SIGHUP"]("SIGHUP", object())
  362. @disable_stdouts
  363. def test_worker_int_again_handler_only_stop_MainProcess(self):
  364. if current_process is None:
  365. raise SkipTest("only relevant for multiprocessing")
  366. process = current_process()
  367. name, process.name = process.name, "OtherProcess"
  368. try:
  369. worker = self._Worker()
  370. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  371. with self.assertRaises(SystemExit):
  372. handlers["SIGINT"]("SIGINT", object())
  373. self.assertFalse(worker.terminated)
  374. finally:
  375. process.name = name
  376. @disable_stdouts
  377. def test_worker_term_handler(self):
  378. worker = self._Worker()
  379. handlers = self.psig(cd.install_worker_term_handler, worker)
  380. with self.assertRaises(SystemExit):
  381. handlers["SIGTERM"]("SIGTERM", object())
  382. self.assertTrue(worker.stopped)
  383. @patch("celery.apps.worker.logger")
  384. def test_worker_cry_handler(self, logger):
  385. if sys.platform.startswith("java"):
  386. raise SkipTest("Cry handler does not work on Jython")
  387. if hasattr(sys, "pypy_version_info"):
  388. raise SkipTest("Cry handler does not work on PyPy")
  389. if sys.version_info > (2, 5):
  390. handlers = self.psig(cd.install_cry_handler)
  391. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  392. self.assertTrue(logger.error.called)
  393. else:
  394. raise SkipTest("Needs Python 2.5 or later")
  395. @disable_stdouts
  396. def test_worker_term_handler_only_stop_MainProcess(self):
  397. if current_process is None:
  398. raise SkipTest("only relevant for multiprocessing")
  399. process = current_process()
  400. name, process.name = process.name, "OtherProcess"
  401. try:
  402. worker = self._Worker()
  403. handlers = self.psig(cd.install_worker_term_handler, worker)
  404. with self.assertRaises(SystemExit):
  405. handlers["SIGTERM"]("SIGTERM", object())
  406. self.assertFalse(worker.stopped)
  407. finally:
  408. process.name = name
  409. @disable_stdouts
  410. def test_worker_restart_handler(self):
  411. if getattr(os, "execv", None) is None:
  412. raise SkipTest("platform does not have excv")
  413. argv = []
  414. def _execv(*args):
  415. argv.extend(args)
  416. execv, os.execv = os.execv, _execv
  417. try:
  418. worker = self._Worker()
  419. handlers = self.psig(cd.install_worker_restart_handler, worker)
  420. handlers["SIGHUP"]("SIGHUP", object())
  421. self.assertTrue(worker.stopped)
  422. self.assertTrue(argv)
  423. finally:
  424. os.execv = execv
  425. @disable_stdouts
  426. def test_worker_term_hard_handler(self):
  427. worker = self._Worker()
  428. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  429. with self.assertRaises(SystemTerminate):
  430. handlers["SIGQUIT"]("SIGQUIT", object())
  431. self.assertTrue(worker.terminated)