test_celeryd.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import logging
  4. import os
  5. import sys
  6. import warnings
  7. from functools import wraps
  8. try:
  9. from multiprocessing import current_process
  10. except ImportError:
  11. current_process = None # noqa
  12. from mock import patch
  13. from nose import SkipTest
  14. from kombu.tests.utils import redirect_stdouts
  15. from celery import Celery
  16. from celery import platforms
  17. from celery import signals
  18. from celery import current_app
  19. from celery.apps import worker as cd
  20. from celery.bin.celeryd import WorkerCommand, windows_main, \
  21. main as celeryd_main
  22. from celery.exceptions import ImproperlyConfigured
  23. from celery.tests.compat import catch_warnings
  24. from celery.tests.utils import (AppCase, WhateverIO, mask_modules,
  25. reset_modules, skip_unless_module)
  26. from celery.utils.patch import ensure_process_aware_logger
  27. ensure_process_aware_logger()
  28. def disable_stdouts(fun):
  29. @wraps(fun)
  30. def disable(*args, **kwargs):
  31. sys.stdout, sys.stderr = WhateverIO(), WhateverIO()
  32. try:
  33. return fun(*args, **kwargs)
  34. finally:
  35. sys.stdout = sys.__stdout__
  36. sys.stderr = sys.__stderr__
  37. return disable
  38. class _WorkController(object):
  39. def __init__(self, *args, **kwargs):
  40. self.logger = current_app.log.get_default_logger()
  41. def start(self):
  42. pass
  43. class Worker(cd.Worker):
  44. WorkController = _WorkController
  45. class test_compilation(AppCase):
  46. def test_no_multiprocessing(self):
  47. with mask_modules("multiprocessing"):
  48. with reset_modules("celery.apps.worker"):
  49. from celery.apps.worker import multiprocessing
  50. self.assertIsNone(multiprocessing)
  51. def test_cpu_count_no_mp(self):
  52. with mask_modules("multiprocessing"):
  53. with reset_modules("celery.apps.worker"):
  54. from celery.apps.worker import cpu_count
  55. self.assertEqual(cpu_count(), 2)
  56. @skip_unless_module("multiprocessing")
  57. def test_no_cpu_count(self):
  58. @patch("multiprocessing.cpu_count")
  59. def _do_test(pcount):
  60. pcount.side_effect = NotImplementedError("cpu_count")
  61. from celery.apps.worker import cpu_count
  62. self.assertEqual(cpu_count(), 2)
  63. pcount.assert_called_with()
  64. _do_test()
  65. def test_process_name_wo_mp(self):
  66. with mask_modules("multiprocessing"):
  67. with reset_modules("celery.apps.worker"):
  68. from celery.apps.worker import get_process_name
  69. self.assertIsNone(get_process_name())
  70. @skip_unless_module("multiprocessing")
  71. def test_process_name_w_mp(self):
  72. @patch("multiprocessing.current_process")
  73. def _do_test(current_process):
  74. from celery.apps.worker import get_process_name
  75. self.assertTrue(get_process_name())
  76. _do_test()
  77. class test_Worker(AppCase):
  78. Worker = Worker
  79. @disable_stdouts
  80. def test_queues_string(self):
  81. celery = Celery(set_as_current=False)
  82. worker = celery.Worker(queues="foo,bar,baz")
  83. worker.init_queues()
  84. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  85. self.assertTrue("foo" in celery.amqp.queues)
  86. @disable_stdouts
  87. def test_windows_B_option(self):
  88. celery = Celery(set_as_current=False)
  89. celery.IS_WINDOWS = True
  90. with self.assertRaises(SystemExit):
  91. celery.Worker(embed_clockservice=True)
  92. def test_tasklist(self):
  93. celery = Celery(set_as_current=False)
  94. worker = celery.Worker()
  95. self.assertTrue(worker.tasklist(include_builtins=True))
  96. worker.tasklist(include_builtins=False)
  97. def test_extra_info(self):
  98. celery = Celery(set_as_current=False)
  99. worker = celery.Worker()
  100. worker.loglevel = logging.WARNING
  101. self.assertFalse(worker.extra_info())
  102. worker.loglevel = logging.INFO
  103. self.assertTrue(worker.extra_info())
  104. @disable_stdouts
  105. def test_loglevel_string(self):
  106. worker = self.Worker(loglevel="INFO")
  107. self.assertEqual(worker.loglevel, logging.INFO)
  108. def test_run_worker(self):
  109. handlers = {}
  110. class Signals(platforms.Signals):
  111. def __setitem__(self, sig, handler):
  112. handlers[sig] = handler
  113. p = platforms.signals
  114. platforms.signals = Signals()
  115. try:
  116. w = self.Worker()
  117. w._isatty = False
  118. w.run_worker()
  119. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  120. self.assertIn(sig, handlers)
  121. handlers.clear()
  122. w = self.Worker()
  123. w._isatty = True
  124. w.run_worker()
  125. for sig in "SIGINT", "SIGTERM":
  126. self.assertIn(sig, handlers)
  127. self.assertNotIn("SIGHUP", handlers)
  128. finally:
  129. platforms.signals = p
  130. @disable_stdouts
  131. def test_startup_info(self):
  132. worker = self.Worker()
  133. worker.run()
  134. self.assertTrue(worker.startup_info())
  135. worker.loglevel = logging.DEBUG
  136. self.assertTrue(worker.startup_info())
  137. worker.loglevel = logging.INFO
  138. self.assertTrue(worker.startup_info())
  139. worker.autoscale = 13, 10
  140. self.assertTrue(worker.startup_info())
  141. @disable_stdouts
  142. def test_run(self):
  143. self.Worker().run()
  144. self.Worker(discard=True).run()
  145. worker = self.Worker()
  146. worker.init_loader()
  147. worker.run()
  148. @disable_stdouts
  149. def test_purge_messages(self):
  150. self.Worker().purge_messages()
  151. @disable_stdouts
  152. def test_init_queues(self):
  153. app = current_app
  154. c = app.conf
  155. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  156. "celery": {"exchange": "celery",
  157. "binding_key": "celery"},
  158. "video": {"exchange": "video",
  159. "binding_key": "video"}})
  160. try:
  161. worker = self.Worker(queues=["video"])
  162. worker.init_queues()
  163. self.assertIn("video", app.amqp.queues)
  164. self.assertIn("video", app.amqp.queues.consume_from)
  165. self.assertIn("celery", app.amqp.queues)
  166. self.assertNotIn("celery", app.amqp.queues.consume_from)
  167. c.CELERY_CREATE_MISSING_QUEUES = False
  168. with self.assertRaises(ImproperlyConfigured):
  169. self.Worker(queues=["image"]).init_queues()
  170. c.CELERY_CREATE_MISSING_QUEUES = True
  171. worker = self.Worker(queues=["image"])
  172. worker.init_queues()
  173. self.assertIn("image", app.amqp.queues.consume_from)
  174. self.assertDictContainsSubset({"exchange": "image",
  175. "routing_key": "image",
  176. "binding_key": "image",
  177. "exchange_type": "direct"},
  178. app.amqp.queues["image"])
  179. finally:
  180. app.amqp.queues = p
  181. @disable_stdouts
  182. def test_autoscale_argument(self):
  183. worker1 = self.Worker(autoscale="10,3")
  184. self.assertListEqual(worker1.autoscale, [10, 3])
  185. worker2 = self.Worker(autoscale="10")
  186. self.assertListEqual(worker2.autoscale, [10, 0])
  187. def test_include_argument(self):
  188. worker1 = self.Worker(include="some.module")
  189. self.assertListEqual(worker1.include, ["some.module"])
  190. worker2 = self.Worker(include="some.module,another.package")
  191. self.assertListEqual(worker2.include, ["some.module",
  192. "another.package"])
  193. worker3 = self.Worker(include="os,sys")
  194. worker3.init_loader()
  195. @disable_stdouts
  196. def test_unknown_loglevel(self):
  197. with self.assertRaises(SystemExit):
  198. self.Worker(loglevel="ALIEN")
  199. worker1 = self.Worker(loglevel=0xFFFF)
  200. self.assertEqual(worker1.loglevel, 0xFFFF)
  201. def test_warns_if_running_as_privileged_user(self):
  202. app = current_app
  203. if app.IS_WINDOWS:
  204. raise SkipTest("Not applicable on Windows")
  205. warnings.resetwarnings()
  206. def getuid():
  207. return 0
  208. prev, os.getuid = os.getuid, getuid
  209. try:
  210. with catch_warnings(record=True) as log:
  211. worker = self.Worker()
  212. worker.run()
  213. self.assertTrue(log)
  214. self.assertIn("superuser privileges is discouraged",
  215. log[0].message.args[0])
  216. finally:
  217. os.getuid = prev
  218. @disable_stdouts
  219. def test_use_pidfile(self):
  220. from celery import platforms
  221. class create_pidlock(object):
  222. instance = [None]
  223. def __init__(self, file):
  224. self.file = file
  225. self.instance[0] = self
  226. def acquire(self):
  227. self.acquired = True
  228. class Object(object):
  229. def release(self):
  230. pass
  231. return Object()
  232. prev, platforms.create_pidlock = platforms.create_pidlock, \
  233. create_pidlock
  234. try:
  235. worker = self.Worker(pidfile="pidfilelockfilepid")
  236. worker.run_worker()
  237. self.assertTrue(create_pidlock.instance[0].acquired)
  238. finally:
  239. platforms.create_pidlock = prev
  240. @disable_stdouts
  241. def test_redirect_stdouts(self):
  242. worker = self.Worker()
  243. worker.redirect_stdouts = False
  244. worker.redirect_stdouts_to_logger()
  245. with self.assertRaises(AttributeError):
  246. sys.stdout.logger
  247. def test_redirect_stdouts_already_handled(self):
  248. logging_setup = [False]
  249. def on_logging_setup(**kwargs):
  250. logging_setup[0] = True
  251. signals.setup_logging.connect(on_logging_setup)
  252. try:
  253. worker = self.Worker()
  254. worker.app.log.__class__._setup = False
  255. worker.redirect_stdouts_to_logger()
  256. self.assertTrue(logging_setup[0])
  257. with self.assertRaises(AttributeError):
  258. sys.stdout.logger
  259. finally:
  260. signals.setup_logging.disconnect(on_logging_setup)
  261. @disable_stdouts
  262. def test_platform_tweaks_osx(self):
  263. class OSXWorker(self.Worker):
  264. proxy_workaround_installed = False
  265. def osx_proxy_detection_workaround(self):
  266. self.proxy_workaround_installed = True
  267. worker = OSXWorker()
  268. def install_HUP_nosupport(controller):
  269. controller.hup_not_supported_installed = True
  270. class Controller(object):
  271. logger = logging.getLogger("celery.tests")
  272. prev = cd.install_HUP_not_supported_handler
  273. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  274. try:
  275. worker.app.IS_OSX = True
  276. controller = Controller()
  277. worker.install_platform_tweaks(controller)
  278. self.assertTrue(controller.hup_not_supported_installed)
  279. self.assertTrue(worker.proxy_workaround_installed)
  280. finally:
  281. cd.install_HUP_not_supported_handler = prev
  282. @disable_stdouts
  283. def test_general_platform_tweaks(self):
  284. restart_worker_handler_installed = [False]
  285. def install_worker_restart_handler(worker):
  286. restart_worker_handler_installed[0] = True
  287. class Controller(object):
  288. logger = logging.getLogger("celery.tests")
  289. prev = cd.install_worker_restart_handler
  290. cd.install_worker_restart_handler = install_worker_restart_handler
  291. try:
  292. worker = self.Worker()
  293. worker.app.IS_OSX = False
  294. worker.install_platform_tweaks(Controller())
  295. self.assertTrue(restart_worker_handler_installed[0])
  296. finally:
  297. cd.install_worker_restart_handler = prev
  298. @disable_stdouts
  299. def test_on_consumer_ready(self):
  300. worker_ready_sent = [False]
  301. def on_worker_ready(**kwargs):
  302. worker_ready_sent[0] = True
  303. signals.worker_ready.connect(on_worker_ready)
  304. self.Worker().on_consumer_ready(object())
  305. self.assertTrue(worker_ready_sent[0])
  306. class test_funs(AppCase):
  307. @redirect_stdouts
  308. def test_windows_main(self, stdout, stderr):
  309. windows_main()
  310. self.assertIn("celeryd command does not work on Windows",
  311. stderr.getvalue())
  312. @disable_stdouts
  313. def test_set_process_status(self):
  314. try:
  315. __import__("setproctitle")
  316. except ImportError:
  317. raise SkipTest("setproctitle not installed")
  318. worker = Worker(hostname="xyzza")
  319. prev1, sys.argv = sys.argv, ["Arg0"]
  320. try:
  321. st = worker.set_process_status("Running")
  322. self.assertIn("celeryd", st)
  323. self.assertIn("xyzza", st)
  324. self.assertIn("Running", st)
  325. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  326. try:
  327. st = worker.set_process_status("Running")
  328. self.assertIn("celeryd", st)
  329. self.assertIn("xyzza", st)
  330. self.assertIn("Running", st)
  331. self.assertIn("Arg1", st)
  332. finally:
  333. sys.argv = prev2
  334. finally:
  335. sys.argv = prev1
  336. @disable_stdouts
  337. def test_parse_options(self):
  338. cmd = WorkerCommand()
  339. cmd.app = current_app
  340. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  341. self.assertEqual(opts.concurrency, 512)
  342. @disable_stdouts
  343. def test_main(self):
  344. p, cd.Worker = cd.Worker, Worker
  345. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  346. try:
  347. celeryd_main()
  348. finally:
  349. cd.Worker = p
  350. sys.argv = s
  351. class test_signal_handlers(AppCase):
  352. class _Worker(object):
  353. stopped = False
  354. terminated = False
  355. logger = current_app.log.get_default_logger()
  356. def stop(self, in_sighandler=False):
  357. self.stopped = True
  358. def terminate(self, in_sighandler=False):
  359. self.terminated = True
  360. def psig(self, fun, *args, **kwargs):
  361. handlers = {}
  362. class Signals(platforms.Signals):
  363. def __setitem__(self, sig, handler):
  364. handlers[sig] = handler
  365. p, platforms.signals = platforms.signals, Signals()
  366. try:
  367. fun(*args, **kwargs)
  368. return handlers
  369. finally:
  370. platforms.signals = p
  371. @disable_stdouts
  372. def test_worker_int_handler(self):
  373. worker = self._Worker()
  374. handlers = self.psig(cd.install_worker_int_handler, worker)
  375. next_handlers = {}
  376. class Signals(platforms.Signals):
  377. def __setitem__(self, sig, handler):
  378. next_handlers[sig] = handler
  379. p, platforms.signals = platforms.signals, Signals()
  380. try:
  381. with self.assertRaises(SystemExit):
  382. handlers["SIGINT"]("SIGINT", object())
  383. self.assertTrue(worker.stopped)
  384. finally:
  385. platforms.signals = p
  386. with self.assertRaises(SystemExit):
  387. next_handlers["SIGINT"]("SIGINT", object())
  388. self.assertTrue(worker.terminated)
  389. @disable_stdouts
  390. def test_worker_int_handler_only_stop_MainProcess(self):
  391. if current_process is None:
  392. raise SkipTest("only relevant for multiprocessing")
  393. process = current_process()
  394. name, process.name = process.name, "OtherProcess"
  395. try:
  396. worker = self._Worker()
  397. handlers = self.psig(cd.install_worker_int_handler, worker)
  398. with self.assertRaises(SystemExit):
  399. handlers["SIGINT"]("SIGINT", object())
  400. self.assertFalse(worker.stopped)
  401. finally:
  402. process.name = name
  403. @disable_stdouts
  404. def test_install_HUP_not_supported_handler(self):
  405. worker = self._Worker()
  406. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  407. handlers["SIGHUP"]("SIGHUP", object())
  408. @disable_stdouts
  409. def test_worker_int_again_handler_only_stop_MainProcess(self):
  410. if current_process is None:
  411. raise SkipTest("only relevant for multiprocessing")
  412. process = current_process()
  413. name, process.name = process.name, "OtherProcess"
  414. try:
  415. worker = self._Worker()
  416. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  417. with self.assertRaises(SystemExit):
  418. handlers["SIGINT"]("SIGINT", object())
  419. self.assertFalse(worker.terminated)
  420. finally:
  421. process.name = name
  422. @disable_stdouts
  423. def test_worker_term_handler(self):
  424. worker = self._Worker()
  425. handlers = self.psig(cd.install_worker_term_handler, worker)
  426. with self.assertRaises(SystemExit):
  427. handlers["SIGTERM"]("SIGTERM", object())
  428. self.assertTrue(worker.stopped)
  429. def test_worker_cry_handler(self):
  430. if sys.platform.startswith("java"):
  431. raise SkipTest("Cry handler does not work on Jython")
  432. if hasattr(sys, "pypy_version_info"):
  433. raise SkipTest("Cry handler does not work on PyPy")
  434. if sys.version_info > (2, 5):
  435. class Logger(object):
  436. _errors = []
  437. def error(self, msg, *args, **kwargs):
  438. self._errors.append(msg)
  439. logger = Logger()
  440. handlers = self.psig(cd.install_cry_handler, logger)
  441. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  442. self.assertTrue(Logger._errors)
  443. else:
  444. raise SkipTest("Needs Python 2.5 or later")
  445. @disable_stdouts
  446. def test_worker_term_handler_only_stop_MainProcess(self):
  447. if current_process is None:
  448. raise SkipTest("only relevant for multiprocessing")
  449. process = current_process()
  450. name, process.name = process.name, "OtherProcess"
  451. try:
  452. worker = self._Worker()
  453. handlers = self.psig(cd.install_worker_term_handler, worker)
  454. with self.assertRaises(SystemExit):
  455. handlers["SIGTERM"]("SIGTERM", object())
  456. self.assertFalse(worker.stopped)
  457. finally:
  458. process.name = name
  459. @disable_stdouts
  460. def test_worker_restart_handler(self):
  461. if getattr(os, "execv", None) is None:
  462. raise SkipTest("platform does not have excv")
  463. argv = []
  464. def _execv(*args):
  465. argv.extend(args)
  466. execv, os.execv = os.execv, _execv
  467. try:
  468. worker = self._Worker()
  469. handlers = self.psig(cd.install_worker_restart_handler, worker)
  470. handlers["SIGHUP"]("SIGHUP", object())
  471. self.assertTrue(worker.stopped)
  472. self.assertTrue(argv)
  473. finally:
  474. os.execv = execv