test_celeryd.py 22 KB


  1. from __future__ import absolute_import
  2. import logging
  3. import os
  4. import sys
  5. from functools import wraps
  6. from mock import Mock, patch
  7. from nose import SkipTest
  8. from billiard import current_process
  9. from kombu import Exchange, Queue
  10. from celery import Celery
  11. from celery import platforms
  12. from celery import signals
  13. from celery import current_app
  14. from celery.apps import worker as cd
  15. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  16. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  17. from celery.utils.log import ensure_process_aware_logger
  18. from celery.worker import state
  19. from celery.tests.utils import (
  20. AppCase,
  21. WhateverIO,
  22. skip_if_pypy,
  23. skip_if_jython,
  24. )
  25. ensure_process_aware_logger()
  26. def disable_stdouts(fun):
  27. @wraps(fun)
  28. def disable(*args, **kwargs):
  29. prev_out, prev_err = sys.stdout, sys.stderr
  30. prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
  31. sys.stdout = sys.__stdout__ = WhateverIO()
  32. sys.stderr = sys.__stderr__ = WhateverIO()
  33. try:
  34. return fun(*args, **kwargs)
  35. finally:
  36. sys.stdout = prev_out
  37. sys.stderr = prev_err
  38. sys.__stdout__ = prev_rout
  39. sys.__stderr__ = prev_rerr
  40. return disable
  41. class Worker(cd.Worker):
  42. def __init__(self, *args, **kwargs):
  43. super(Worker, self).__init__(*args, **kwargs)
  44. self.redirect_stdouts = False
  45. def start(self, *args, **kwargs):
  46. self.on_start()
  47. class test_Worker(AppCase):
  48. Worker = Worker
  49. def teardown(self):
  50. self.app.conf.CELERY_INCLUDE = ()
  51. @disable_stdouts
  52. def test_queues_string(self):
  53. celery = Celery(set_as_current=False)
  54. worker = celery.Worker()
  55. worker.setup_queues('foo,bar,baz')
  56. self.assertEqual(worker.queues, ['foo', 'bar', 'baz'])
  57. self.assertTrue('foo' in celery.amqp.queues)
  58. @disable_stdouts
  59. def test_cpu_count(self):
  60. celery = Celery(set_as_current=False)
  61. with patch('celery.worker.cpu_count') as cpu_count:
  62. cpu_count.side_effect = NotImplementedError()
  63. worker = celery.Worker(concurrency=None)
  64. self.assertEqual(worker.concurrency, 2)
  65. worker = celery.Worker(concurrency=5)
  66. self.assertEqual(worker.concurrency, 5)
  67. @disable_stdouts
  68. def test_windows_B_option(self):
  69. celery = Celery(set_as_current=False)
  70. celery.IS_WINDOWS = True
  71. with self.assertRaises(SystemExit):
  72. WorkerCommand(app=celery).run(beat=True)
  73. def test_setup_concurrency_very_early(self):
  74. x = WorkerCommand()
  75. x.run = Mock()
  76. with self.assertRaises(ImportError):
  77. x.execute_from_commandline(['celeryd', '-P', 'xyzybox'])
  78. @disable_stdouts
  79. def test_invalid_loglevel_gives_error(self):
  80. x = WorkerCommand(app=Celery(set_as_current=False))
  81. with self.assertRaises(SystemExit):
  82. x.run(loglevel='GRIM_REAPER')
  83. def test_no_loglevel(self):
  84. app = Celery(set_as_current=False)
  85. app.Worker = Mock()
  86. WorkerCommand(app=app).run(loglevel=None)
  87. def test_tasklist(self):
  88. celery = Celery(set_as_current=False)
  89. worker = celery.Worker()
  90. self.assertTrue(worker.app.tasks)
  91. self.assertTrue(worker.app.finalized)
  92. self.assertTrue(worker.tasklist(include_builtins=True))
  93. worker.tasklist(include_builtins=False)
  94. def test_extra_info(self):
  95. celery = Celery(set_as_current=False)
  96. worker = celery.Worker()
  97. worker.loglevel = logging.WARNING
  98. self.assertFalse(worker.extra_info())
  99. worker.loglevel = logging.INFO
  100. self.assertTrue(worker.extra_info())
  101. @disable_stdouts
  102. def test_loglevel_string(self):
  103. worker = self.Worker(loglevel='INFO')
  104. self.assertEqual(worker.loglevel, logging.INFO)
  105. def test_run_worker(self):
  106. handlers = {}
  107. class Signals(platforms.Signals):
  108. def __setitem__(self, sig, handler):
  109. handlers[sig] = handler
  110. p = platforms.signals
  111. platforms.signals = Signals()
  112. try:
  113. w = self.Worker()
  114. w._isatty = False
  115. w.on_start()
  116. for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
  117. self.assertIn(sig, handlers)
  118. handlers.clear()
  119. w = self.Worker()
  120. w._isatty = True
  121. w.on_start()
  122. for sig in 'SIGINT', 'SIGTERM':
  123. self.assertIn(sig, handlers)
  124. self.assertNotIn('SIGHUP', handlers)
  125. finally:
  126. platforms.signals = p
  127. @disable_stdouts
  128. def test_startup_info(self):
  129. worker = self.Worker()
  130. worker.on_start()
  131. self.assertTrue(worker.startup_info())
  132. worker.loglevel = logging.DEBUG
  133. self.assertTrue(worker.startup_info())
  134. worker.loglevel = logging.INFO
  135. self.assertTrue(worker.startup_info())
  136. worker.autoscale = 13, 10
  137. self.assertTrue(worker.startup_info())
  138. app = Celery(set_as_current=False)
  139. worker = self.Worker(app=app, queues='foo,bar,baz,xuzzy,do,re,mi')
  140. prev, app.loader = app.loader, Mock()
  141. try:
  142. app.loader.__module__ = 'acme.baked_beans'
  143. self.assertTrue(worker.startup_info())
  144. finally:
  145. app.loader = prev
  146. prev, app.loader = app.loader, Mock()
  147. try:
  148. app.loader.__module__ = 'celery.loaders.foo'
  149. self.assertTrue(worker.startup_info())
  150. finally:
  151. app.loader = prev
  152. from celery.loaders.app import AppLoader
  153. prev, app.loader = app.loader, AppLoader()
  154. try:
  155. self.assertTrue(worker.startup_info())
  156. finally:
  157. app.loader = prev
  158. worker.send_events = True
  159. self.assertTrue(worker.startup_info())
  160. # test when there are too few output lines
  161. # to draft the ascii art onto
  162. prev, cd.ARTLINES = (cd.ARTLINES,
  163. ['the quick brown fox'])
  164. self.assertTrue(worker.startup_info())
  165. @disable_stdouts
  166. def test_run(self):
  167. self.Worker().on_start()
  168. self.Worker(purge=True).on_start()
  169. worker = self.Worker()
  170. worker.on_start()
  171. @disable_stdouts
  172. def test_purge_messages(self):
  173. self.Worker().purge_messages()
  174. @disable_stdouts
  175. def test_init_queues(self):
  176. app = current_app
  177. c = app.conf
  178. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  179. 'celery': {'exchange': 'celery',
  180. 'routing_key': 'celery'},
  181. 'video': {'exchange': 'video',
  182. 'routing_key': 'video'}})
  183. try:
  184. worker = self.Worker()
  185. worker.setup_queues(['video'])
  186. self.assertIn('video', app.amqp.queues)
  187. self.assertIn('video', app.amqp.queues.consume_from)
  188. self.assertIn('celery', app.amqp.queues)
  189. self.assertNotIn('celery', app.amqp.queues.consume_from)
  190. c.CELERY_CREATE_MISSING_QUEUES = False
  191. del(app.amqp.queues)
  192. with self.assertRaises(ImproperlyConfigured):
  193. self.Worker().setup_queues(['image'])
  194. del(app.amqp.queues)
  195. c.CELERY_CREATE_MISSING_QUEUES = True
  196. worker = self.Worker()
  197. worker.setup_queues(queues=['image'])
  198. self.assertIn('image', app.amqp.queues.consume_from)
  199. self.assertEqual(Queue('image', Exchange('image'),
  200. routing_key='image'), app.amqp.queues['image'])
  201. finally:
  202. app.amqp.queues = p
  203. @disable_stdouts
  204. def test_autoscale_argument(self):
  205. worker1 = self.Worker(autoscale='10,3')
  206. self.assertListEqual(worker1.autoscale, [10, 3])
  207. worker2 = self.Worker(autoscale='10')
  208. self.assertListEqual(worker2.autoscale, [10, 0])
  209. def test_include_argument(self):
  210. worker1 = self.Worker(include='some.module')
  211. self.assertListEqual(worker1.include, ['some.module'])
  212. worker2 = self.Worker(include='some.module,another.package')
  213. self.assertListEqual(worker2.include,
  214. ['some.module', 'another.package'])
  215. self.Worker(include=['os', 'sys'])
  216. @disable_stdouts
  217. def test_unknown_loglevel(self):
  218. with self.assertRaises(SystemExit):
  219. WorkerCommand(app=self.app).run(loglevel='ALIEN')
  220. worker1 = self.Worker(loglevel=0xFFFF)
  221. self.assertEqual(worker1.loglevel, 0xFFFF)
  222. def test_warns_if_running_as_privileged_user(self):
  223. app = current_app
  224. if app.IS_WINDOWS:
  225. raise SkipTest('Not applicable on Windows')
  226. def getuid():
  227. return 0
  228. prev, os.getuid = os.getuid, getuid
  229. try:
  230. with self.assertWarnsRegex(RuntimeWarning,
  231. r'superuser privileges is discouraged'):
  232. worker = self.Worker()
  233. worker.on_start()
  234. finally:
  235. os.getuid = prev
  236. @disable_stdouts
  237. def test_redirect_stdouts(self):
  238. worker = self.Worker()
  239. worker.redirect_stdouts = False
  240. worker.redirect_stdouts_to_logger()
  241. with self.assertRaises(AttributeError):
  242. sys.stdout.logger
  243. def test_redirect_stdouts_already_handled(self):
  244. logging_setup = [False]
  245. @signals.setup_logging.connect
  246. def on_logging_setup(**kwargs):
  247. logging_setup[0] = True
  248. try:
  249. worker = self.Worker()
  250. worker.app.log.__class__._setup = False
  251. worker.redirect_stdouts_to_logger()
  252. self.assertTrue(logging_setup[0])
  253. with self.assertRaises(AttributeError):
  254. sys.stdout.logger
  255. finally:
  256. signals.setup_logging.disconnect(on_logging_setup)
  257. @disable_stdouts
  258. def test_platform_tweaks_osx(self):
  259. class OSXWorker(Worker):
  260. proxy_workaround_installed = False
  261. def osx_proxy_detection_workaround(self):
  262. self.proxy_workaround_installed = True
  263. worker = OSXWorker(redirect_stdouts=False)
  264. def install_HUP_nosupport(controller):
  265. controller.hup_not_supported_installed = True
  266. class Controller(object):
  267. pass
  268. prev = cd.install_HUP_not_supported_handler
  269. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  270. try:
  271. worker.app.IS_OSX = True
  272. controller = Controller()
  273. worker.install_platform_tweaks(controller)
  274. self.assertTrue(controller.hup_not_supported_installed)
  275. self.assertTrue(worker.proxy_workaround_installed)
  276. finally:
  277. cd.install_HUP_not_supported_handler = prev
  278. @disable_stdouts
  279. def test_general_platform_tweaks(self):
  280. restart_worker_handler_installed = [False]
  281. def install_worker_restart_handler(worker):
  282. restart_worker_handler_installed[0] = True
  283. class Controller(object):
  284. pass
  285. prev = cd.install_worker_restart_handler
  286. cd.install_worker_restart_handler = install_worker_restart_handler
  287. try:
  288. worker = self.Worker()
  289. worker.app.IS_OSX = False
  290. worker.install_platform_tweaks(Controller())
  291. self.assertTrue(restart_worker_handler_installed[0])
  292. finally:
  293. cd.install_worker_restart_handler = prev
  294. @disable_stdouts
  295. def test_on_consumer_ready(self):
  296. worker_ready_sent = [False]
  297. @signals.worker_ready.connect
  298. def on_worker_ready(**kwargs):
  299. worker_ready_sent[0] = True
  300. self.Worker().on_consumer_ready(object())
  301. self.assertTrue(worker_ready_sent[0])
  302. class test_funs(AppCase):
  303. def test_active_thread_count(self):
  304. self.assertTrue(cd.active_thread_count())
  305. @disable_stdouts
  306. def test_set_process_status(self):
  307. try:
  308. __import__('setproctitle')
  309. except ImportError:
  310. raise SkipTest('setproctitle not installed')
  311. worker = Worker(hostname='xyzza')
  312. prev1, sys.argv = sys.argv, ['Arg0']
  313. try:
  314. st = worker.set_process_status('Running')
  315. self.assertIn('celeryd', st)
  316. self.assertIn('xyzza', st)
  317. self.assertIn('Running', st)
  318. prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
  319. try:
  320. st = worker.set_process_status('Running')
  321. self.assertIn('celeryd', st)
  322. self.assertIn('xyzza', st)
  323. self.assertIn('Running', st)
  324. self.assertIn('Arg1', st)
  325. finally:
  326. sys.argv = prev2
  327. finally:
  328. sys.argv = prev1
  329. @disable_stdouts
  330. def test_parse_options(self):
  331. cmd = WorkerCommand()
  332. cmd.app = current_app
  333. opts, args = cmd.parse_options('celeryd', ['--concurrency=512'])
  334. self.assertEqual(opts.concurrency, 512)
  335. @disable_stdouts
  336. def test_main(self):
  337. p, cd.Worker = cd.Worker, Worker
  338. s, sys.argv = sys.argv, ['celeryd', '--discard']
  339. try:
  340. celeryd_main()
  341. finally:
  342. cd.Worker = p
  343. sys.argv = s
  344. class test_signal_handlers(AppCase):
  345. class _Worker(object):
  346. stopped = False
  347. terminated = False
  348. def stop(self, in_sighandler=False):
  349. self.stopped = True
  350. def terminate(self, in_sighandler=False):
  351. self.terminated = True
  352. def psig(self, fun, *args, **kwargs):
  353. handlers = {}
  354. class Signals(platforms.Signals):
  355. def __setitem__(self, sig, handler):
  356. handlers[sig] = handler
  357. p, platforms.signals = platforms.signals, Signals()
  358. try:
  359. fun(*args, **kwargs)
  360. return handlers
  361. finally:
  362. platforms.signals = p
  363. @disable_stdouts
  364. def test_worker_int_handler(self):
  365. worker = self._Worker()
  366. handlers = self.psig(cd.install_worker_int_handler, worker)
  367. next_handlers = {}
  368. state.should_stop = False
  369. state.should_terminate = False
  370. class Signals(platforms.Signals):
  371. def __setitem__(self, sig, handler):
  372. next_handlers[sig] = handler
  373. with patch('celery.apps.worker.active_thread_count') as c:
  374. c.return_value = 3
  375. p, platforms.signals = platforms.signals, Signals()
  376. try:
  377. handlers['SIGINT']('SIGINT', object())
  378. self.assertTrue(state.should_stop)
  379. finally:
  380. platforms.signals = p
  381. state.should_stop = False
  382. try:
  383. next_handlers['SIGINT']('SIGINT', object())
  384. self.assertTrue(state.should_terminate)
  385. finally:
  386. state.should_terminate = False
  387. with patch('celery.apps.worker.active_thread_count') as c:
  388. c.return_value = 1
  389. p, platforms.signals = platforms.signals, Signals()
  390. try:
  391. with self.assertRaises(SystemExit):
  392. handlers['SIGINT']('SIGINT', object())
  393. finally:
  394. platforms.signals = p
  395. with self.assertRaises(SystemTerminate):
  396. next_handlers['SIGINT']('SIGINT', object())
  397. @disable_stdouts
  398. def test_worker_int_handler_only_stop_MainProcess(self):
  399. try:
  400. import _multiprocessing # noqa
  401. except ImportError:
  402. raise SkipTest('only relevant for multiprocessing')
  403. process = current_process()
  404. name, process.name = process.name, 'OtherProcess'
  405. with patch('celery.apps.worker.active_thread_count') as c:
  406. c.return_value = 3
  407. try:
  408. worker = self._Worker()
  409. handlers = self.psig(cd.install_worker_int_handler, worker)
  410. handlers['SIGINT']('SIGINT', object())
  411. self.assertTrue(state.should_stop)
  412. finally:
  413. process.name = name
  414. state.should_stop = False
  415. with patch('celery.apps.worker.active_thread_count') as c:
  416. c.return_value = 1
  417. try:
  418. worker = self._Worker()
  419. handlers = self.psig(cd.install_worker_int_handler, worker)
  420. with self.assertRaises(SystemExit):
  421. handlers['SIGINT']('SIGINT', object())
  422. finally:
  423. process.name = name
  424. state.should_stop = False
  425. @disable_stdouts
  426. def test_install_HUP_not_supported_handler(self):
  427. worker = self._Worker()
  428. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  429. handlers['SIGHUP']('SIGHUP', object())
  430. @disable_stdouts
  431. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  432. try:
  433. import _multiprocessing # noqa
  434. except ImportError:
  435. raise SkipTest('only relevant for multiprocessing')
  436. process = current_process()
  437. name, process.name = process.name, 'OtherProcess'
  438. try:
  439. with patch('celery.apps.worker.active_thread_count') as c:
  440. c.return_value = 3
  441. worker = self._Worker()
  442. handlers = self.psig(
  443. cd.install_worker_term_hard_handler, worker)
  444. try:
  445. handlers['SIGQUIT']('SIGQUIT', object())
  446. self.assertTrue(state.should_terminate)
  447. finally:
  448. state.should_terminate = False
  449. with patch('celery.apps.worker.active_thread_count') as c:
  450. c.return_value = 1
  451. worker = self._Worker()
  452. handlers = self.psig(
  453. cd.install_worker_term_hard_handler, worker)
  454. with self.assertRaises(SystemTerminate):
  455. handlers['SIGQUIT']('SIGQUIT', object())
  456. finally:
  457. process.name = name
  458. @disable_stdouts
  459. def test_worker_term_handler_when_threads(self):
  460. with patch('celery.apps.worker.active_thread_count') as c:
  461. c.return_value = 3
  462. worker = self._Worker()
  463. handlers = self.psig(cd.install_worker_term_handler, worker)
  464. try:
  465. handlers['SIGTERM']('SIGTERM', object())
  466. self.assertTrue(state.should_stop)
  467. finally:
  468. state.should_stop = False
  469. @disable_stdouts
  470. def test_worker_term_handler_when_single_thread(self):
  471. with patch('celery.apps.worker.active_thread_count') as c:
  472. c.return_value = 1
  473. worker = self._Worker()
  474. handlers = self.psig(cd.install_worker_term_handler, worker)
  475. try:
  476. with self.assertRaises(SystemExit):
  477. handlers['SIGTERM']('SIGTERM', object())
  478. finally:
  479. state.should_stop = False
  480. @patch('sys.__stderr__')
  481. @skip_if_pypy
  482. @skip_if_jython
  483. def test_worker_cry_handler(self, stderr):
  484. handlers = self.psig(cd.install_cry_handler)
  485. self.assertIsNone(handlers['SIGUSR1']('SIGUSR1', object()))
  486. self.assertTrue(stderr.write.called)
  487. @disable_stdouts
  488. def test_worker_term_handler_only_stop_MainProcess(self):
  489. try:
  490. import _multiprocessing # noqa
  491. except ImportError:
  492. raise SkipTest('only relevant for multiprocessing')
  493. process = current_process()
  494. name, process.name = process.name, 'OtherProcess'
  495. try:
  496. with patch('celery.apps.worker.active_thread_count') as c:
  497. c.return_value = 3
  498. worker = self._Worker()
  499. handlers = self.psig(cd.install_worker_term_handler, worker)
  500. handlers['SIGTERM']('SIGTERM', object())
  501. self.assertTrue(state.should_stop)
  502. with patch('celery.apps.worker.active_thread_count') as c:
  503. c.return_value = 1
  504. worker = self._Worker()
  505. handlers = self.psig(cd.install_worker_term_handler, worker)
  506. with self.assertRaises(SystemExit):
  507. handlers['SIGTERM']('SIGTERM', object())
  508. finally:
  509. process.name = name
  510. state.should_stop = False
  511. @disable_stdouts
  512. @patch('atexit.register')
  513. @patch('os.fork')
  514. def test_worker_restart_handler(self, fork, register):
  515. fork.return_value = 0
  516. if getattr(os, 'execv', None) is None:
  517. raise SkipTest('platform does not have excv')
  518. argv = []
  519. def _execv(*args):
  520. argv.extend(args)
  521. execv, os.execv = os.execv, _execv
  522. try:
  523. worker = self._Worker()
  524. handlers = self.psig(cd.install_worker_restart_handler, worker)
  525. handlers['SIGHUP']('SIGHUP', object())
  526. self.assertTrue(state.should_stop)
  527. self.assertTrue(register.called)
  528. callback = register.call_args[0][0]
  529. callback()
  530. self.assertTrue(argv)
  531. argv[:] = []
  532. fork.return_value = 1
  533. callback()
  534. self.assertFalse(argv)
  535. finally:
  536. os.execv = execv
  537. state.should_stop = False
  538. @disable_stdouts
  539. def test_worker_term_hard_handler_when_threaded(self):
  540. with patch('celery.apps.worker.active_thread_count') as c:
  541. c.return_value = 3
  542. worker = self._Worker()
  543. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  544. try:
  545. handlers['SIGQUIT']('SIGQUIT', object())
  546. self.assertTrue(state.should_terminate)
  547. finally:
  548. state.should_terminate = False
  549. @disable_stdouts
  550. def test_worker_term_hard_handler_when_single_threaded(self):
  551. with patch('celery.apps.worker.active_thread_count') as c:
  552. c.return_value = 1
  553. worker = self._Worker()
  554. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  555. with self.assertRaises(SystemTerminate):
  556. handlers['SIGQUIT']('SIGQUIT', object())