test_celeryd.py 16 KB


  1. from __future__ import with_statement
  2. import logging
  3. import os
  4. import sys
  5. import warnings
  6. from functools import wraps
  7. try:
  8. from multiprocessing import current_process
  9. except ImportError:
  10. current_process = None # noqa
  11. from functools import wraps
  12. from nose import SkipTest
  13. from kombu.tests.utils import redirect_stdouts
  14. from celery import Celery
  15. from celery import platforms
  16. from celery import signals
  17. from celery import current_app
  18. from celery.apps import worker as cd
  19. from celery.bin.celeryd import WorkerCommand, windows_main, \
  20. main as celeryd_main
  21. from celery.exceptions import ImproperlyConfigured
  22. from celery.utils import patch
  23. from celery.tests.compat import catch_warnings
  24. from celery.tests.utils import AppCase, StringIO
  25. patch.ensure_process_aware_logger()
  26. def disable_stdouts(fun):
  27. @wraps(fun)
  28. def disable(*args, **kwargs):
  29. sys.stdout, sys.stderr = StringIO(), StringIO()
  30. try:
  31. return fun(*args, **kwargs)
  32. finally:
  33. sys.stdout = sys.__stdout__
  34. sys.stderr = sys.__stderr__
  35. return disable
  36. class _WorkController(object):
  37. def __init__(self, *args, **kwargs):
  38. self.logger = current_app.log.get_default_logger()
  39. def start(self):
  40. pass
  41. class Worker(cd.Worker):
  42. WorkController = _WorkController
  43. class test_Worker(AppCase):
  44. Worker = Worker
  45. @disable_stdouts
  46. def test_queues_string(self):
  47. celery = Celery(set_as_current=False)
  48. worker = celery.Worker(queues="foo,bar,baz")
  49. worker.init_queues()
  50. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  51. self.assertTrue("foo" in celery.amqp.queues)
  52. @disable_stdouts
  53. def test_loglevel_string(self):
  54. worker = self.Worker(loglevel="INFO")
  55. self.assertEqual(worker.loglevel, logging.INFO)
  56. def test_run_worker(self):
  57. handlers = {}
  58. class Signals(platforms.Signals):
  59. def __setitem__(self, sig, handler):
  60. handlers[sig] = handler
  61. p = platforms.signals
  62. platforms.signals = Signals()
  63. try:
  64. w = self.Worker()
  65. w._isatty = False
  66. w.run_worker()
  67. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  68. self.assertIn(sig, handlers)
  69. handlers.clear()
  70. w = self.Worker()
  71. w._isatty = True
  72. w.run_worker()
  73. for sig in "SIGINT", "SIGTERM":
  74. self.assertIn(sig, handlers)
  75. self.assertNotIn("SIGHUP", handlers)
  76. finally:
  77. platforms.signals = p
  78. @disable_stdouts
  79. def test_startup_info(self):
  80. worker = self.Worker()
  81. worker.run()
  82. self.assertTrue(worker.startup_info())
  83. worker.loglevel = logging.DEBUG
  84. self.assertTrue(worker.startup_info())
  85. worker.loglevel = logging.INFO
  86. self.assertTrue(worker.startup_info())
  87. @disable_stdouts
  88. def test_run(self):
  89. self.Worker().run()
  90. self.Worker(discard=True).run()
  91. worker = self.Worker()
  92. worker.init_loader()
  93. worker.run()
  94. @disable_stdouts
  95. def test_purge_messages(self):
  96. self.Worker().purge_messages()
  97. @disable_stdouts
  98. def test_init_queues(self):
  99. app = current_app
  100. c = app.conf
  101. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  102. "celery": {"exchange": "celery",
  103. "binding_key": "celery"},
  104. "video": {"exchange": "video",
  105. "binding_key": "video"}})
  106. try:
  107. worker = self.Worker(queues=["video"])
  108. worker.init_queues()
  109. self.assertIn("video", app.amqp.queues)
  110. self.assertIn("video", app.amqp.queues.consume_from)
  111. self.assertIn("celery", app.amqp.queues)
  112. self.assertNotIn("celery", app.amqp.queues.consume_from)
  113. c.CELERY_CREATE_MISSING_QUEUES = False
  114. self.assertRaises(ImproperlyConfigured,
  115. self.Worker(queues=["image"]).init_queues)
  116. c.CELERY_CREATE_MISSING_QUEUES = True
  117. worker = self.Worker(queues=["image"])
  118. worker.init_queues()
  119. self.assertIn("image", app.amqp.queues.consume_from)
  120. self.assertDictContainsSubset({"exchange": "image",
  121. "routing_key": "image",
  122. "binding_key": "image",
  123. "exchange_type": "direct"},
  124. app.amqp.queues["image"])
  125. finally:
  126. app.amqp.queues = p
  127. @disable_stdouts
  128. def test_autoscale_argument(self):
  129. worker1 = self.Worker(autoscale="10,3")
  130. self.assertListEqual(worker1.autoscale, [10, 3])
  131. worker2 = self.Worker(autoscale="10")
  132. self.assertListEqual(worker2.autoscale, [10, 0])
  133. def test_include_argument(self):
  134. worker1 = self.Worker(include="some.module")
  135. self.assertListEqual(worker1.include, ["some.module"])
  136. worker2 = self.Worker(include="some.module,another.package")
  137. self.assertListEqual(worker2.include, ["some.module",
  138. "another.package"])
  139. worker3 = self.Worker(include="os,sys")
  140. worker3.init_loader()
  141. @disable_stdouts
  142. def test_unknown_loglevel(self):
  143. self.assertRaises(SystemExit, self.Worker, loglevel="ALIEN")
  144. worker1 = self.Worker(loglevel=0xFFFF)
  145. self.assertEqual(worker1.loglevel, 0xFFFF)
  146. def test_warns_if_running_as_privileged_user(self):
  147. app = current_app
  148. if app.IS_WINDOWS:
  149. raise SkipTest("Not applicable on Windows")
  150. warnings.resetwarnings()
  151. def geteuid():
  152. return 0
  153. prev, os.geteuid = os.geteuid, geteuid
  154. try:
  155. with catch_warnings(record=True) as log:
  156. worker = self.Worker()
  157. worker.run()
  158. self.assertTrue(log)
  159. self.assertIn("superuser privileges is not encouraged",
  160. log[0].message.args[0])
  161. finally:
  162. os.geteuid = prev
  163. @disable_stdouts
  164. def test_use_pidfile(self):
  165. from celery import platforms
  166. class create_pidlock(object):
  167. instance = [None]
  168. def __init__(self, file):
  169. self.file = file
  170. self.instance[0] = self
  171. def acquire(self):
  172. self.acquired = True
  173. class Object(object):
  174. def release(self):
  175. pass
  176. return Object()
  177. prev, platforms.create_pidlock = platforms.create_pidlock, \
  178. create_pidlock
  179. try:
  180. worker = self.Worker(pidfile="pidfilelockfilepid")
  181. worker.run_worker()
  182. self.assertTrue(create_pidlock.instance[0].acquired)
  183. finally:
  184. platforms.create_pidlock = prev
  185. @disable_stdouts
  186. def test_redirect_stdouts(self):
  187. worker = self.Worker()
  188. worker.redirect_stdouts = False
  189. worker.redirect_stdouts_to_logger()
  190. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  191. def test_redirect_stdouts_already_handled(self):
  192. logging_setup = [False]
  193. def on_logging_setup(**kwargs):
  194. logging_setup[0] = True
  195. signals.setup_logging.connect(on_logging_setup)
  196. try:
  197. worker = self.Worker()
  198. worker.app.log.__class__._setup = False
  199. worker.redirect_stdouts_to_logger()
  200. self.assertTrue(logging_setup[0])
  201. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  202. finally:
  203. signals.setup_logging.disconnect(on_logging_setup)
  204. @disable_stdouts
  205. def test_platform_tweaks_osx(self):
  206. class OSXWorker(self.Worker):
  207. proxy_workaround_installed = False
  208. def osx_proxy_detection_workaround(self):
  209. self.proxy_workaround_installed = True
  210. worker = OSXWorker()
  211. def install_HUP_nosupport(controller):
  212. controller.hup_not_supported_installed = True
  213. class Controller(object):
  214. logger = logging.getLogger("celery.tests")
  215. prev = cd.install_HUP_not_supported_handler
  216. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  217. try:
  218. worker.app.IS_OSX = True
  219. controller = Controller()
  220. worker.install_platform_tweaks(controller)
  221. self.assertTrue(controller.hup_not_supported_installed)
  222. self.assertTrue(worker.proxy_workaround_installed)
  223. finally:
  224. cd.install_HUP_not_supported_handler = prev
  225. @disable_stdouts
  226. def test_general_platform_tweaks(self):
  227. restart_worker_handler_installed = [False]
  228. def install_worker_restart_handler(worker):
  229. restart_worker_handler_installed[0] = True
  230. class Controller(object):
  231. logger = logging.getLogger("celery.tests")
  232. prev = cd.install_worker_restart_handler
  233. cd.install_worker_restart_handler = install_worker_restart_handler
  234. try:
  235. worker = self.Worker()
  236. worker.app.IS_OSX = False
  237. worker.install_platform_tweaks(Controller())
  238. self.assertTrue(restart_worker_handler_installed[0])
  239. finally:
  240. cd.install_worker_restart_handler = prev
  241. @disable_stdouts
  242. def test_on_consumer_ready(self):
  243. worker_ready_sent = [False]
  244. def on_worker_ready(**kwargs):
  245. worker_ready_sent[0] = True
  246. signals.worker_ready.connect(on_worker_ready)
  247. self.Worker().on_consumer_ready(object())
  248. self.assertTrue(worker_ready_sent[0])
  249. class test_funs(AppCase):
  250. @redirect_stdouts
  251. def test_windows_main(self, stdout, stderr):
  252. windows_main()
  253. self.assertIn("celeryd command does not work on Windows",
  254. stderr.getvalue())
  255. @disable_stdouts
  256. def test_set_process_status(self):
  257. try:
  258. __import__("setproctitle")
  259. except ImportError:
  260. raise SkipTest("setproctitle not installed")
  261. worker = Worker(hostname="xyzza")
  262. prev1, sys.argv = sys.argv, ["Arg0"]
  263. try:
  264. st = worker.set_process_status("Running")
  265. self.assertIn("celeryd", st)
  266. self.assertIn("xyzza", st)
  267. self.assertIn("Running", st)
  268. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  269. try:
  270. st = worker.set_process_status("Running")
  271. self.assertIn("celeryd", st)
  272. self.assertIn("xyzza", st)
  273. self.assertIn("Running", st)
  274. self.assertIn("Arg1", st)
  275. finally:
  276. sys.argv = prev2
  277. finally:
  278. sys.argv = prev1
  279. @disable_stdouts
  280. def test_parse_options(self):
  281. cmd = WorkerCommand()
  282. cmd.app = current_app
  283. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  284. self.assertEqual(opts.concurrency, 512)
  285. @disable_stdouts
  286. def test_main(self):
  287. p, cd.Worker = cd.Worker, Worker
  288. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  289. try:
  290. celeryd_main()
  291. finally:
  292. cd.Worker = p
  293. sys.argv = s
  294. class test_signal_handlers(AppCase):
  295. class _Worker(object):
  296. stopped = False
  297. terminated = False
  298. logger = current_app.log.get_default_logger()
  299. def stop(self, in_sighandler=False):
  300. self.stopped = True
  301. def terminate(self, in_sighandler=False):
  302. self.terminated = True
  303. def psig(self, fun, *args, **kwargs):
  304. handlers = {}
  305. class Signals(platforms.Signals):
  306. def __setitem__(self, sig, handler):
  307. handlers[sig] = handler
  308. p, platforms.signals = platforms.signals, Signals()
  309. try:
  310. fun(*args, **kwargs)
  311. return handlers
  312. finally:
  313. platforms.signals = p
  314. @disable_stdouts
  315. def test_worker_int_handler(self):
  316. worker = self._Worker()
  317. handlers = self.psig(cd.install_worker_int_handler, worker)
  318. next_handlers = {}
  319. class Signals(platforms.Signals):
  320. def __setitem__(self, sig, handler):
  321. next_handlers[sig] = handler
  322. p, platforms.signals = platforms.signals, Signals()
  323. try:
  324. self.assertRaises(SystemExit, handlers["SIGINT"],
  325. "SIGINT", object())
  326. self.assertTrue(worker.stopped)
  327. finally:
  328. platforms.signals = p
  329. self.assertRaises(SystemExit, next_handlers["SIGINT"],
  330. "SIGINT", object())
  331. self.assertTrue(worker.terminated)
  332. @disable_stdouts
  333. def test_worker_int_handler_only_stop_MainProcess(self):
  334. if current_process is None:
  335. raise SkipTest("only relevant for multiprocessing")
  336. process = current_process()
  337. name, process.name = process.name, "OtherProcess"
  338. try:
  339. worker = self._Worker()
  340. handlers = self.psig(cd.install_worker_int_handler, worker)
  341. self.assertRaises(SystemExit, handlers["SIGINT"],
  342. "SIGINT", object())
  343. self.assertFalse(worker.stopped)
  344. finally:
  345. process.name = name
  346. @disable_stdouts
  347. def test_install_HUP_not_supported_handler(self):
  348. worker = self._Worker()
  349. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  350. handlers["SIGHUP"]("SIGHUP", object())
  351. @disable_stdouts
  352. def test_worker_int_again_handler_only_stop_MainProcess(self):
  353. if current_process is None:
  354. raise SkipTest("only relevant for multiprocessing")
  355. process = current_process()
  356. name, process.name = process.name, "OtherProcess"
  357. try:
  358. worker = self._Worker()
  359. handlers = self.psig(cd.install_worker_int_again_handler, worker)
  360. self.assertRaises(SystemExit, handlers["SIGINT"],
  361. "SIGINT", object())
  362. self.assertFalse(worker.terminated)
  363. finally:
  364. process.name = name
  365. @disable_stdouts
  366. def test_worker_term_handler(self):
  367. worker = self._Worker()
  368. handlers = self.psig(cd.install_worker_term_handler, worker)
  369. self.assertRaises(SystemExit, handlers["SIGTERM"],
  370. "SIGTERM", object())
  371. self.assertTrue(worker.stopped)
  372. def test_worker_cry_handler(self):
  373. if sys.platform.startswith("java"):
  374. raise SkipTest("Cry handler does not work on Jython")
  375. if hasattr(sys, "pypy_version_info"):
  376. raise SkipTest("Cry handler does not work on PyPy")
  377. if sys.version_info > (2, 5):
  378. class Logger(object):
  379. _errors = []
  380. def error(self, msg, *args, **kwargs):
  381. self._errors.append(msg)
  382. logger = Logger()
  383. handlers = self.psig(cd.install_cry_handler, logger)
  384. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  385. self.assertTrue(Logger._errors)
  386. else:
  387. raise SkipTest("Needs Python 2.5 or later")
  388. @disable_stdouts
  389. def test_worker_term_handler_only_stop_MainProcess(self):
  390. if current_process is None:
  391. raise SkipTest("only relevant for multiprocessing")
  392. process = current_process()
  393. name, process.name = process.name, "OtherProcess"
  394. try:
  395. worker = self._Worker()
  396. handlers = self.psig(cd.install_worker_term_handler, worker)
  397. self.assertRaises(SystemExit, handlers["SIGTERM"],
  398. "SIGTERM", object())
  399. self.assertFalse(worker.stopped)
  400. finally:
  401. process.name = name
  402. @disable_stdouts
  403. def test_worker_restart_handler(self):
  404. if getattr(os, "execv", None) is None:
  405. raise SkipTest("platform does not have excv")
  406. argv = []
  407. def _execv(*args):
  408. argv.extend(args)
  409. execv, os.execv = os.execv, _execv
  410. try:
  411. worker = self._Worker()
  412. handlers = self.psig(cd.install_worker_restart_handler, worker)
  413. handlers["SIGHUP"]("SIGHUP", object())
  414. self.assertTrue(worker.stopped)
  415. self.assertTrue(argv)
  416. finally:
  417. os.execv = execv