test_celeryd.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. from __future__ import with_statement
  2. import logging
  3. import os
  4. import sys
  5. import warnings
  6. from functools import wraps
  7. try:
  8. from multiprocessing import current_process
  9. except ImportError:
  10. current_process = None # noqa
  11. from mock import patch
  12. from nose import SkipTest
  13. from kombu.tests.utils import redirect_stdouts
  14. from celery import Celery
  15. from celery import platforms
  16. from celery import signals
  17. from celery import current_app
  18. from celery.apps import worker as cd
  19. from celery.bin.celeryd import WorkerCommand, windows_main, \
  20. main as celeryd_main
  21. from celery.exceptions import ImproperlyConfigured
  22. from celery.tests.compat import catch_warnings
  23. from celery.tests.utils import (AppCase, StringIO, mask_modules,
  24. reset_modules, skip_unless_module)
  25. from celery.utils.patch import ensure_process_aware_logger
  26. ensure_process_aware_logger()
  27. def disable_stdouts(fun):
  28. @wraps(fun)
  29. def disable(*args, **kwargs):
  30. sys.stdout, sys.stderr = StringIO(), StringIO()
  31. try:
  32. return fun(*args, **kwargs)
  33. finally:
  34. sys.stdout = sys.__stdout__
  35. sys.stderr = sys.__stderr__
  36. return disable
  37. class _WorkController(object):
  38. def __init__(self, *args, **kwargs):
  39. self.logger = current_app.log.get_default_logger()
  40. def start(self):
  41. pass
  42. class Worker(cd.Worker):
  43. WorkController = _WorkController
  44. class test_compilation(AppCase):
  45. def test_no_multiprocessing(self):
  46. with mask_modules("multiprocessing"):
  47. with reset_modules("celery.apps.worker"):
  48. from celery.apps.worker import multiprocessing
  49. self.assertIsNone(multiprocessing)
  50. def test_cpu_count_no_mp(self):
  51. with mask_modules("multiprocessing"):
  52. with reset_modules("celery.apps.worker"):
  53. from celery.apps.worker import cpu_count
  54. self.assertEqual(cpu_count(), 2)
  55. @skip_unless_module("multiprocessing")
  56. def test_no_cpu_count(self, pcount):
  57. @patch("multiprocessing.cpu_count")
  58. def _do_test():
  59. pcount.side_effect = NotImplementedError("cpu_count")
  60. from celery.apps.worker import cpu_count
  61. self.assertEqual(cpu_count(), 2)
  62. pcount.assert_called_with()
  63. _do_test()
  64. def test_process_name_wo_mp(self):
  65. with mask_modules("multiprocessing"):
  66. with reset_modules("celery.apps.worker"):
  67. from celery.apps.worker import get_process_name
  68. self.assertIsNone(get_process_name())
  69. @skip_unless_module("multiprocessing")
  70. def test_process_name_w_mp(self, current_process):
  71. @patch("multiprocessing.current_process")
  72. def _do_test():
  73. from celery.apps.worker import get_process_name
  74. self.assertTrue(get_process_name())
  75. _do_test()
  76. class test_Worker(AppCase):
  77. Worker = Worker
  78. @disable_stdouts
  79. def test_queues_string(self):
  80. celery = Celery(set_as_current=False)
  81. worker = celery.Worker(queues="foo,bar,baz")
  82. worker.init_queues()
  83. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  84. self.assertTrue("foo" in celery.amqp.queues)
  85. @disable_stdouts
  86. def test_windows_B_option(self):
  87. celery = Celery(set_as_current=False)
  88. celery.IS_WINDOWS = True
  89. with self.assertRaises(SystemExit):
  90. celery.Worker(run_clockservice=True)
  91. def test_tasklist(self):
  92. celery = Celery(set_as_current=False)
  93. worker = celery.Worker()
  94. self.assertTrue(worker.tasklist(include_builtins=True))
  95. worker.tasklist(include_builtins=False)
  96. def test_extra_info(self):
  97. celery = Celery(set_as_current=False)
  98. worker = celery.Worker()
  99. worker.loglevel = logging.WARNING
  100. self.assertFalse(worker.extra_info())
  101. worker.loglevel = logging.INFO
  102. self.assertTrue(worker.extra_info())
  103. @disable_stdouts
  104. def test_loglevel_string(self):
  105. worker = self.Worker(loglevel="INFO")
  106. self.assertEqual(worker.loglevel, logging.INFO)
  107. def test_run_worker(self):
  108. handlers = {}
  109. class Signals(platforms.Signals):
  110. def __setitem__(self, sig, handler):
  111. handlers[sig] = handler
  112. p = platforms.signals
  113. platforms.signals = Signals()
  114. try:
  115. w = self.Worker()
  116. w._isatty = False
  117. w.run_worker()
  118. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  119. self.assertIn(sig, handlers)
  120. handlers.clear()
  121. w = self.Worker()
  122. w._isatty = True
  123. w.run_worker()
  124. for sig in "SIGINT", "SIGTERM":
  125. self.assertIn(sig, handlers)
  126. self.assertNotIn("SIGHUP", handlers)
  127. finally:
  128. platforms.signals = p
  129. @disable_stdouts
  130. def test_startup_info(self):
  131. worker = self.Worker()
  132. worker.run()
  133. self.assertTrue(worker.startup_info())
  134. worker.loglevel = logging.DEBUG
  135. self.assertTrue(worker.startup_info())
  136. worker.loglevel = logging.INFO
  137. self.assertTrue(worker.startup_info())
  138. worker.autoscale = 13, 10
  139. self.assertTrue(worker.startup_info())
  140. @disable_stdouts
  141. def test_run(self):
  142. self.Worker().run()
  143. self.Worker(discard=True).run()
  144. worker = self.Worker()
  145. worker.init_loader()
  146. worker.run()
  147. @disable_stdouts
  148. def test_purge_messages(self):
  149. self.Worker().purge_messages()
  150. @disable_stdouts
  151. def test_init_queues(self):
  152. app = current_app
  153. c = app.conf
  154. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  155. "celery": {"exchange": "celery",
  156. "binding_key": "celery"},
  157. "video": {"exchange": "video",
  158. "binding_key": "video"}})
  159. try:
  160. worker = self.Worker(queues=["video"])
  161. worker.init_queues()
  162. self.assertIn("video", app.amqp.queues)
  163. self.assertIn("video", app.amqp.queues.consume_from)
  164. self.assertIn("celery", app.amqp.queues)
  165. self.assertNotIn("celery", app.amqp.queues.consume_from)
  166. c.CELERY_CREATE_MISSING_QUEUES = False
  167. self.assertRaises(ImproperlyConfigured,
  168. self.Worker(queues=["image"]).init_queues)
  169. c.CELERY_CREATE_MISSING_QUEUES = True
  170. worker = self.Worker(queues=["image"])
  171. worker.init_queues()
  172. self.assertIn("image", app.amqp.queues.consume_from)
  173. self.assertDictContainsSubset({"exchange": "image",
  174. "routing_key": "image",
  175. "binding_key": "image",
  176. "exchange_type": "direct"},
  177. app.amqp.queues["image"])
  178. finally:
  179. app.amqp.queues = p
  180. @disable_stdouts
  181. def test_autoscale_argument(self):
  182. worker1 = self.Worker(autoscale="10,3")
  183. self.assertListEqual(worker1.autoscale, [10, 3])
  184. worker2 = self.Worker(autoscale="10")
  185. self.assertListEqual(worker2.autoscale, [10, 0])
  186. def test_include_argument(self):
  187. worker1 = self.Worker(include="some.module")
  188. self.assertListEqual(worker1.include, ["some.module"])
  189. worker2 = self.Worker(include="some.module,another.package")
  190. self.assertListEqual(worker2.include, ["some.module",
  191. "another.package"])
  192. worker3 = self.Worker(include="os,sys")
  193. worker3.init_loader()
  194. @disable_stdouts
  195. def test_unknown_loglevel(self):
  196. self.assertRaises(SystemExit, self.Worker, loglevel="ALIEN")
  197. worker1 = self.Worker(loglevel=0xFFFF)
  198. self.assertEqual(worker1.loglevel, 0xFFFF)
  199. def test_warns_if_running_as_privileged_user(self):
  200. app = current_app
  201. if app.IS_WINDOWS:
  202. raise SkipTest("Not applicable on Windows")
  203. warnings.resetwarnings()
  204. def geteuid():
  205. return 0
  206. prev, os.geteuid = os.geteuid, geteuid
  207. try:
  208. with catch_warnings(record=True) as log:
  209. worker = self.Worker()
  210. worker.run()
  211. self.assertTrue(log)
  212. self.assertIn("superuser privileges is not encouraged",
  213. log[0].message.args[0])
  214. finally:
  215. os.geteuid = prev
  216. @disable_stdouts
  217. def test_use_pidfile(self):
  218. from celery import platforms
  219. class create_pidlock(object):
  220. instance = [None]
  221. def __init__(self, file):
  222. self.file = file
  223. self.instance[0] = self
  224. def acquire(self):
  225. self.acquired = True
  226. class Object(object):
  227. def release(self):
  228. pass
  229. return Object()
  230. prev, platforms.create_pidlock = platforms.create_pidlock, \
  231. create_pidlock
  232. try:
  233. worker = self.Worker(pidfile="pidfilelockfilepid")
  234. worker.run_worker()
  235. self.assertTrue(create_pidlock.instance[0].acquired)
  236. finally:
  237. platforms.create_pidlock = prev
  238. @disable_stdouts
  239. def test_redirect_stdouts(self):
  240. worker = self.Worker()
  241. worker.redirect_stdouts = False
  242. worker.redirect_stdouts_to_logger()
  243. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  244. def test_redirect_stdouts_already_handled(self):
  245. logging_setup = [False]
  246. def on_logging_setup(**kwargs):
  247. logging_setup[0] = True
  248. signals.setup_logging.connect(on_logging_setup)
  249. try:
  250. worker = self.Worker()
  251. worker.app.log.__class__._setup = False
  252. worker.redirect_stdouts_to_logger()
  253. self.assertTrue(logging_setup[0])
  254. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  255. finally:
  256. signals.setup_logging.disconnect(on_logging_setup)
  257. @disable_stdouts
  258. def test_platform_tweaks_osx(self):
  259. class OSXWorker(self.Worker):
  260. proxy_workaround_installed = False
  261. def osx_proxy_detection_workaround(self):
  262. self.proxy_workaround_installed = True
  263. worker = OSXWorker()
  264. def install_HUP_nosupport(controller):
  265. controller.hup_not_supported_installed = True
  266. class Controller(object):
  267. logger = logging.getLogger("celery.tests")
  268. prev = cd.install_HUP_not_supported_handler
  269. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  270. try:
  271. worker.app.IS_OSX = True
  272. controller = Controller()
  273. worker.install_platform_tweaks(controller)
  274. self.assertTrue(controller.hup_not_supported_installed)
  275. self.assertTrue(worker.proxy_workaround_installed)
  276. finally:
  277. cd.install_HUP_not_supported_handler = prev
  278. @disable_stdouts
  279. def test_general_platform_tweaks(self):
  280. restart_worker_handler_installed = [False]
  281. def install_worker_restart_handler(worker):
  282. restart_worker_handler_installed[0] = True
  283. class Controller(object):
  284. logger = logging.getLogger("celery.tests")
  285. prev = cd.install_worker_restart_handler
  286. cd.install_worker_restart_handler = install_worker_restart_handler
  287. try:
  288. worker = self.Worker()
  289. worker.app.IS_OSX = False
  290. worker.install_platform_tweaks(Controller())
  291. self.assertTrue(restart_worker_handler_installed[0])
  292. finally:
  293. cd.install_worker_restart_handler = prev
  294. @disable_stdouts
  295. def test_on_consumer_ready(self):
  296. worker_ready_sent = [False]
  297. def on_worker_ready(**kwargs):
  298. worker_ready_sent[0] = True
  299. signals.worker_ready.connect(on_worker_ready)
  300. self.Worker().on_consumer_ready(object())
  301. self.assertTrue(worker_ready_sent[0])
  302. class test_funs(AppCase):
  303. @redirect_stdouts
  304. def test_windows_main(self, stdout, stderr):
  305. windows_main()
  306. self.assertIn("celeryd command does not work on Windows",
  307. stderr.getvalue())
  308. @disable_stdouts
  309. def test_set_process_status(self):
  310. try:
  311. __import__("setproctitle")
  312. except ImportError:
  313. raise SkipTest("setproctitle not installed")
  314. worker = Worker(hostname="xyzza")
  315. prev1, sys.argv = sys.argv, ["Arg0"]
  316. try:
  317. st = worker.set_process_status("Running")
  318. self.assertIn("celeryd", st)
  319. self.assertIn("xyzza", st)
  320. self.assertIn("Running", st)
  321. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  322. try:
  323. st = worker.set_process_status("Running")
  324. self.assertIn("celeryd", st)
  325. self.assertIn("xyzza", st)
  326. self.assertIn("Running", st)
  327. self.assertIn("Arg1", st)
  328. finally:
  329. sys.argv = prev2
  330. finally:
  331. sys.argv = prev1
  332. @disable_stdouts
  333. def test_parse_options(self):
  334. cmd = WorkerCommand()
  335. cmd.app = current_app
  336. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  337. self.assertEqual(opts.concurrency, 512)
  338. @disable_stdouts
  339. def test_main(self):
  340. p, cd.Worker = cd.Worker, Worker
  341. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  342. try:
  343. celeryd_main()
  344. finally:
  345. cd.Worker = p
  346. sys.argv = s
  347. class test_signal_handlers(AppCase):
  348. class _Worker(object):
  349. stopped = False
  350. terminated = False
  351. logger = current_app.log.get_default_logger()
  352. def stop(self, in_sighandler=False):
  353. self.stopped = True
  354. def terminate(self, in_sighandler=False):
  355. self.terminated = True
  356. def psig(self, fun, *args, **kwargs):
  357. handlers = {}
  358. class Signals(platforms.Signals):
  359. def __setitem__(self, sig, handler):
  360. handlers[sig] = handler
  361. p, platforms.signals = platforms.signals, Signals()
  362. try:
  363. fun(*args, **kwargs)
  364. return handlers
  365. finally:
  366. platforms.signals = p
  367. @disable_stdouts
  368. def test_worker_int_handler(self):
  369. worker = self._Worker()
  370. handlers = self.psig(cd.install_worker_int_handler, worker)
  371. next_handlers = {}
  372. class Signals(platforms.Signals):
  373. def __setitem__(self, sig, handler):
  374. next_handlers[sig] = handler
  375. p, platforms.signals = platforms.signals, Signals()
  376. try:
  377. self.assertRaises(SystemExit, handlers["SIGINT"],
  378. "SIGINT", object())
  379. self.assertTrue(worker.stopped)
  380. finally:
  381. platforms.signals = p
  382. self.assertRaises(SystemExit, next_handlers["SIGINT"],
  383. "SIGINT", object())
  384. self.assertTrue(worker.terminated)
  385. @disable_stdouts
  386. def test_worker_int_handler_only_stop_MainProcess(self):
  387. if current_process is None:
  388. raise SkipTest("only relevant for multiprocessing")
  389. process = current_process()
  390. name, process.name = process.name, "OtherProcess"
  391. try:
  392. worker = self._Worker()
  393. handlers = self.psig(cd.install_worker_int_handler, worker)
  394. self.assertRaises(SystemExit, handlers["SIGINT"],
  395. "SIGINT", object())
  396. self.assertFalse(worker.stopped)
  397. finally:
  398. process.name = name
  399. @disable_stdouts
  400. def test_install_HUP_not_supported_handler(self):
  401. worker = self._Worker()
  402. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  403. handlers["SIGHUP"]("SIGHUP", object())
  404. @disable_stdouts
  405. def test_worker_int_again_handler_only_stop_MainProcess(self):
  406. if current_process is None:
  407. raise SkipTest("only relevant for multiprocessing")
  408. process = current_process()
  409. name, process.name = process.name, "OtherProcess"
  410. try:
  411. worker = self._Worker()
  412. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  413. self.assertRaises(SystemExit, handlers["SIGINT"],
  414. "SIGINT", object())
  415. self.assertFalse(worker.terminated)
  416. finally:
  417. process.name = name
  418. @disable_stdouts
  419. def test_worker_term_handler(self):
  420. worker = self._Worker()
  421. handlers = self.psig(cd.install_worker_term_handler, worker)
  422. self.assertRaises(SystemExit, handlers["SIGTERM"],
  423. "SIGTERM", object())
  424. self.assertTrue(worker.stopped)
  425. def test_worker_cry_handler(self):
  426. if sys.platform.startswith("java"):
  427. raise SkipTest("Cry handler does not work on Jython")
  428. if hasattr(sys, "pypy_version_info"):
  429. raise SkipTest("Cry handler does not work on PyPy")
  430. if sys.version_info > (2, 5):
  431. class Logger(object):
  432. _errors = []
  433. def error(self, msg, *args, **kwargs):
  434. self._errors.append(msg)
  435. logger = Logger()
  436. handlers = self.psig(cd.install_cry_handler, logger)
  437. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  438. self.assertTrue(Logger._errors)
  439. else:
  440. raise SkipTest("Needs Python 2.5 or later")
  441. @disable_stdouts
  442. def test_worker_term_handler_only_stop_MainProcess(self):
  443. if current_process is None:
  444. raise SkipTest("only relevant for multiprocessing")
  445. process = current_process()
  446. name, process.name = process.name, "OtherProcess"
  447. try:
  448. worker = self._Worker()
  449. handlers = self.psig(cd.install_worker_term_handler, worker)
  450. self.assertRaises(SystemExit, handlers["SIGTERM"],
  451. "SIGTERM", object())
  452. self.assertFalse(worker.stopped)
  453. finally:
  454. process.name = name
  455. @disable_stdouts
  456. def test_worker_restart_handler(self):
  457. if getattr(os, "execv", None) is None:
  458. raise SkipTest("platform does not have excv")
  459. argv = []
  460. def _execv(*args):
  461. argv.extend(args)
  462. execv, os.execv = os.execv, _execv
  463. try:
  464. worker = self._Worker()
  465. handlers = self.psig(cd.install_worker_restart_handler, worker)
  466. handlers["SIGHUP"]("SIGHUP", object())
  467. self.assertTrue(worker.stopped)
  468. self.assertTrue(argv)
  469. finally:
  470. os.execv = execv