test_celeryd.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import logging
  4. import os
  5. import sys
  6. from functools import wraps
  7. from mock import patch
  8. from nose import SkipTest
  9. from billiard import current_process
  10. from celery import Celery
  11. from celery import platforms
  12. from celery import signals
  13. from celery import current_app
  14. from celery.apps import worker as cd
  15. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  16. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  17. from celery.tests.utils import (AppCase, WhateverIO, mask_modules,
  18. reset_modules, skip_unless_module)
  19. from celery.utils.log import ensure_process_aware_logger
  20. ensure_process_aware_logger()
  21. def disable_stdouts(fun):
  22. @wraps(fun)
  23. def disable(*args, **kwargs):
  24. sys.stdout, sys.stderr = WhateverIO(), WhateverIO()
  25. try:
  26. return fun(*args, **kwargs)
  27. finally:
  28. sys.stdout = sys.__stdout__
  29. sys.stderr = sys.__stderr__
  30. return disable
  31. class _WorkController(object):
  32. def __init__(self, *args, **kwargs):
  33. pass
  34. def start(self):
  35. pass
  36. class Worker(cd.Worker):
  37. WorkController = _WorkController
  38. class test_Worker(AppCase):
  39. Worker = Worker
  40. @disable_stdouts
  41. def test_queues_string(self):
  42. celery = Celery(set_as_current=False)
  43. worker = celery.Worker(queues="foo,bar,baz")
  44. worker.init_queues()
  45. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  46. self.assertTrue("foo" in celery.amqp.queues)
  47. @disable_stdouts
  48. def test_windows_B_option(self):
  49. celery = Celery(set_as_current=False)
  50. celery.IS_WINDOWS = True
  51. with self.assertRaises(SystemExit):
  52. celery.Worker(embed_clockservice=True)
  53. def test_tasklist(self):
  54. celery = Celery(set_as_current=False)
  55. worker = celery.Worker()
  56. self.assertTrue(worker.app.tasks)
  57. self.assertTrue(worker.app.finalized)
  58. self.assertTrue(worker.tasklist(include_builtins=True))
  59. worker.tasklist(include_builtins=False)
  60. def test_extra_info(self):
  61. celery = Celery(set_as_current=False)
  62. worker = celery.Worker()
  63. worker.loglevel = logging.WARNING
  64. self.assertFalse(worker.extra_info())
  65. worker.loglevel = logging.INFO
  66. self.assertTrue(worker.extra_info())
  67. @disable_stdouts
  68. def test_loglevel_string(self):
  69. worker = self.Worker(loglevel="INFO")
  70. self.assertEqual(worker.loglevel, logging.INFO)
  71. def test_run_worker(self):
  72. handlers = {}
  73. class Signals(platforms.Signals):
  74. def __setitem__(self, sig, handler):
  75. handlers[sig] = handler
  76. p = platforms.signals
  77. platforms.signals = Signals()
  78. try:
  79. w = self.Worker()
  80. w._isatty = False
  81. w.run_worker()
  82. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  83. self.assertIn(sig, handlers)
  84. handlers.clear()
  85. w = self.Worker()
  86. w._isatty = True
  87. w.run_worker()
  88. for sig in "SIGINT", "SIGTERM":
  89. self.assertIn(sig, handlers)
  90. self.assertNotIn("SIGHUP", handlers)
  91. finally:
  92. platforms.signals = p
  93. @disable_stdouts
  94. def test_startup_info(self):
  95. worker = self.Worker()
  96. worker.run()
  97. self.assertTrue(worker.startup_info())
  98. worker.loglevel = logging.DEBUG
  99. self.assertTrue(worker.startup_info())
  100. worker.loglevel = logging.INFO
  101. self.assertTrue(worker.startup_info())
  102. worker.autoscale = 13, 10
  103. self.assertTrue(worker.startup_info())
  104. @disable_stdouts
  105. def test_run(self):
  106. self.Worker().run()
  107. self.Worker(discard=True).run()
  108. worker = self.Worker()
  109. worker.init_loader()
  110. worker.run()
  111. @disable_stdouts
  112. def test_purge_messages(self):
  113. self.Worker().purge_messages()
  114. @disable_stdouts
  115. def test_init_queues(self):
  116. app = current_app
  117. c = app.conf
  118. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  119. "celery": {"exchange": "celery",
  120. "binding_key": "celery"},
  121. "video": {"exchange": "video",
  122. "binding_key": "video"}})
  123. try:
  124. worker = self.Worker(queues=["video"])
  125. worker.init_queues()
  126. self.assertIn("video", app.amqp.queues)
  127. self.assertIn("video", app.amqp.queues.consume_from)
  128. self.assertIn("celery", app.amqp.queues)
  129. self.assertNotIn("celery", app.amqp.queues.consume_from)
  130. c.CELERY_CREATE_MISSING_QUEUES = False
  131. with self.assertRaises(ImproperlyConfigured):
  132. self.Worker(queues=["image"]).init_queues()
  133. c.CELERY_CREATE_MISSING_QUEUES = True
  134. worker = self.Worker(queues=["image"])
  135. worker.init_queues()
  136. self.assertIn("image", app.amqp.queues.consume_from)
  137. self.assertDictContainsSubset({"exchange": "image",
  138. "routing_key": "image",
  139. "binding_key": "image",
  140. "exchange_type": "direct"},
  141. app.amqp.queues["image"])
  142. finally:
  143. app.amqp.queues = p
  144. @disable_stdouts
  145. def test_autoscale_argument(self):
  146. worker1 = self.Worker(autoscale="10,3")
  147. self.assertListEqual(worker1.autoscale, [10, 3])
  148. worker2 = self.Worker(autoscale="10")
  149. self.assertListEqual(worker2.autoscale, [10, 0])
  150. def test_include_argument(self):
  151. worker1 = self.Worker(include="some.module")
  152. self.assertListEqual(worker1.include, ["some.module"])
  153. worker2 = self.Worker(include="some.module,another.package")
  154. self.assertListEqual(worker2.include, ["some.module",
  155. "another.package"])
  156. worker3 = self.Worker(include="os,sys")
  157. worker3.init_loader()
  158. @disable_stdouts
  159. def test_unknown_loglevel(self):
  160. with self.assertRaises(SystemExit):
  161. self.Worker(loglevel="ALIEN")
  162. worker1 = self.Worker(loglevel=0xFFFF)
  163. self.assertEqual(worker1.loglevel, 0xFFFF)
  164. def test_warns_if_running_as_privileged_user(self):
  165. app = current_app
  166. if app.IS_WINDOWS:
  167. raise SkipTest("Not applicable on Windows")
  168. def getuid():
  169. return 0
  170. prev, os.getuid = os.getuid, getuid
  171. try:
  172. with self.assertWarnsRegex(RuntimeWarning,
  173. r'superuser privileges is discouraged'):
  174. worker = self.Worker()
  175. worker.run()
  176. finally:
  177. os.getuid = prev
  178. @disable_stdouts
  179. def test_use_pidfile(self):
  180. from celery import platforms
  181. class create_pidlock(object):
  182. instance = [None]
  183. def __init__(self, file):
  184. self.file = file
  185. self.instance[0] = self
  186. def acquire(self):
  187. self.acquired = True
  188. class Object(object):
  189. def release(self):
  190. pass
  191. return Object()
  192. prev, platforms.create_pidlock = platforms.create_pidlock, \
  193. create_pidlock
  194. try:
  195. worker = self.Worker(pidfile="pidfilelockfilepid")
  196. worker.run_worker()
  197. self.assertTrue(create_pidlock.instance[0].acquired)
  198. finally:
  199. platforms.create_pidlock = prev
  200. @disable_stdouts
  201. def test_redirect_stdouts(self):
  202. worker = self.Worker()
  203. worker.redirect_stdouts = False
  204. worker.redirect_stdouts_to_logger()
  205. with self.assertRaises(AttributeError):
  206. sys.stdout.logger
  207. def test_redirect_stdouts_already_handled(self):
  208. logging_setup = [False]
  209. @signals.setup_logging.connect
  210. def on_logging_setup(**kwargs):
  211. logging_setup[0] = True
  212. try:
  213. worker = self.Worker()
  214. worker.app.log.__class__._setup = False
  215. worker.redirect_stdouts_to_logger()
  216. self.assertTrue(logging_setup[0])
  217. with self.assertRaises(AttributeError):
  218. sys.stdout.logger
  219. finally:
  220. signals.setup_logging.disconnect(on_logging_setup)
  221. @disable_stdouts
  222. def test_platform_tweaks_osx(self):
  223. class OSXWorker(self.Worker):
  224. proxy_workaround_installed = False
  225. def osx_proxy_detection_workaround(self):
  226. self.proxy_workaround_installed = True
  227. worker = OSXWorker()
  228. def install_HUP_nosupport(controller):
  229. controller.hup_not_supported_installed = True
  230. class Controller(object):
  231. pass
  232. prev = cd.install_HUP_not_supported_handler
  233. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  234. try:
  235. worker.app.IS_OSX = True
  236. controller = Controller()
  237. worker.install_platform_tweaks(controller)
  238. self.assertTrue(controller.hup_not_supported_installed)
  239. self.assertTrue(worker.proxy_workaround_installed)
  240. finally:
  241. cd.install_HUP_not_supported_handler = prev
  242. @disable_stdouts
  243. def test_general_platform_tweaks(self):
  244. restart_worker_handler_installed = [False]
  245. def install_worker_restart_handler(worker):
  246. restart_worker_handler_installed[0] = True
  247. class Controller(object):
  248. pass
  249. prev = cd.install_worker_restart_handler
  250. cd.install_worker_restart_handler = install_worker_restart_handler
  251. try:
  252. worker = self.Worker()
  253. worker.app.IS_OSX = False
  254. worker.install_platform_tweaks(Controller())
  255. self.assertTrue(restart_worker_handler_installed[0])
  256. finally:
  257. cd.install_worker_restart_handler = prev
  258. @disable_stdouts
  259. def test_on_consumer_ready(self):
  260. worker_ready_sent = [False]
  261. @signals.worker_ready.connect
  262. def on_worker_ready(**kwargs):
  263. worker_ready_sent[0] = True
  264. self.Worker().on_consumer_ready(object())
  265. self.assertTrue(worker_ready_sent[0])
  266. class test_funs(AppCase):
  267. @disable_stdouts
  268. def test_set_process_status(self):
  269. try:
  270. __import__("setproctitle")
  271. except ImportError:
  272. raise SkipTest("setproctitle not installed")
  273. worker = Worker(hostname="xyzza")
  274. prev1, sys.argv = sys.argv, ["Arg0"]
  275. try:
  276. st = worker.set_process_status("Running")
  277. self.assertIn("celeryd", st)
  278. self.assertIn("xyzza", st)
  279. self.assertIn("Running", st)
  280. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  281. try:
  282. st = worker.set_process_status("Running")
  283. self.assertIn("celeryd", st)
  284. self.assertIn("xyzza", st)
  285. self.assertIn("Running", st)
  286. self.assertIn("Arg1", st)
  287. finally:
  288. sys.argv = prev2
  289. finally:
  290. sys.argv = prev1
  291. @disable_stdouts
  292. def test_parse_options(self):
  293. cmd = WorkerCommand()
  294. cmd.app = current_app
  295. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  296. self.assertEqual(opts.concurrency, 512)
  297. @disable_stdouts
  298. def test_main(self):
  299. p, cd.Worker = cd.Worker, Worker
  300. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  301. try:
  302. celeryd_main()
  303. finally:
  304. cd.Worker = p
  305. sys.argv = s
  306. class test_signal_handlers(AppCase):
  307. class _Worker(object):
  308. stopped = False
  309. terminated = False
  310. def stop(self, in_sighandler=False):
  311. self.stopped = True
  312. def terminate(self, in_sighandler=False):
  313. self.terminated = True
  314. def psig(self, fun, *args, **kwargs):
  315. handlers = {}
  316. class Signals(platforms.Signals):
  317. def __setitem__(self, sig, handler):
  318. handlers[sig] = handler
  319. p, platforms.signals = platforms.signals, Signals()
  320. try:
  321. fun(*args, **kwargs)
  322. return handlers
  323. finally:
  324. platforms.signals = p
  325. @disable_stdouts
  326. def test_worker_int_handler(self):
  327. worker = self._Worker()
  328. handlers = self.psig(cd.install_worker_int_handler, worker)
  329. next_handlers = {}
  330. class Signals(platforms.Signals):
  331. def __setitem__(self, sig, handler):
  332. next_handlers[sig] = handler
  333. p, platforms.signals = platforms.signals, Signals()
  334. try:
  335. with self.assertRaises(SystemExit):
  336. handlers["SIGINT"]("SIGINT", object())
  337. self.assertTrue(worker.stopped)
  338. finally:
  339. platforms.signals = p
  340. with self.assertRaises(SystemExit):
  341. next_handlers["SIGINT"]("SIGINT", object())
  342. self.assertTrue(worker.terminated)
  343. @disable_stdouts
  344. def test_worker_int_handler_only_stop_MainProcess(self):
  345. try:
  346. import _multiprocessing
  347. except ImportError:
  348. raise SkipTest("only relevant for multiprocessing")
  349. process = current_process()
  350. name, process.name = process.name, "OtherProcess"
  351. try:
  352. worker = self._Worker()
  353. handlers = self.psig(cd.install_worker_int_handler, worker)
  354. with self.assertRaises(SystemExit):
  355. handlers["SIGINT"]("SIGINT", object())
  356. self.assertFalse(worker.stopped)
  357. finally:
  358. process.name = name
  359. @disable_stdouts
  360. def test_install_HUP_not_supported_handler(self):
  361. worker = self._Worker()
  362. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  363. handlers["SIGHUP"]("SIGHUP", object())
  364. @disable_stdouts
  365. def test_worker_int_again_handler_only_stop_MainProcess(self):
  366. try:
  367. import _multiprocessing
  368. except ImportError:
  369. raise SkipTest("only relevant for multiprocessing")
  370. process = current_process()
  371. name, process.name = process.name, "OtherProcess"
  372. try:
  373. worker = self._Worker()
  374. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  375. with self.assertRaises(SystemExit):
  376. handlers["SIGINT"]("SIGINT", object())
  377. self.assertFalse(worker.terminated)
  378. finally:
  379. process.name = name
  380. @disable_stdouts
  381. def test_worker_term_handler(self):
  382. worker = self._Worker()
  383. handlers = self.psig(cd.install_worker_term_handler, worker)
  384. with self.assertRaises(SystemExit):
  385. handlers["SIGTERM"]("SIGTERM", object())
  386. self.assertTrue(worker.stopped)
  387. @patch("celery.apps.worker.logger")
  388. def test_worker_cry_handler(self, logger):
  389. if sys.platform.startswith("java"):
  390. raise SkipTest("Cry handler does not work on Jython")
  391. if hasattr(sys, "pypy_version_info"):
  392. raise SkipTest("Cry handler does not work on PyPy")
  393. if sys.version_info > (2, 5):
  394. handlers = self.psig(cd.install_cry_handler)
  395. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  396. self.assertTrue(logger.error.called)
  397. else:
  398. raise SkipTest("Needs Python 2.5 or later")
  399. @disable_stdouts
  400. def test_worker_term_handler_only_stop_MainProcess(self):
  401. try:
  402. import _multiprocessing
  403. except ImportError:
  404. raise SkipTest("only relevant for multiprocessing")
  405. process = current_process()
  406. name, process.name = process.name, "OtherProcess"
  407. try:
  408. worker = self._Worker()
  409. handlers = self.psig(cd.install_worker_term_handler, worker)
  410. with self.assertRaises(SystemExit):
  411. handlers["SIGTERM"]("SIGTERM", object())
  412. self.assertFalse(worker.stopped)
  413. finally:
  414. process.name = name
  415. @disable_stdouts
  416. def test_worker_restart_handler(self):
  417. if getattr(os, "execv", None) is None:
  418. raise SkipTest("platform does not have excv")
  419. argv = []
  420. def _execv(*args):
  421. argv.extend(args)
  422. execv, os.execv = os.execv, _execv
  423. try:
  424. worker = self._Worker()
  425. handlers = self.psig(cd.install_worker_restart_handler, worker)
  426. handlers["SIGHUP"]("SIGHUP", object())
  427. self.assertTrue(worker.stopped)
  428. self.assertTrue(argv)
  429. finally:
  430. os.execv = execv
  431. @disable_stdouts
  432. def test_worker_term_hard_handler(self):
  433. worker = self._Worker()
  434. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  435. with self.assertRaises(SystemTerminate):
  436. handlers["SIGQUIT"]("SIGQUIT", object())
  437. self.assertTrue(worker.terminated)