test_beat.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import absolute_import, unicode_literals
  2. import logging
  3. import sys
  4. from celery import beat
  5. from celery import platforms
  6. from celery.bin import beat as beat_bin
  7. from celery.apps import beat as beatapp
  8. from celery.tests.case import AppCase, Mock, mock, patch
  9. class MockBeat(beatapp.Beat):
  10. Service = Mock(
  11. name='MockBeat.Service',
  12. return_value=Mock(name='MockBeat()', max_interval=3.3),
  13. )
  14. class test_Beat(AppCase):
  15. def test_loglevel_string(self):
  16. b = beatapp.Beat(app=self.app, loglevel='DEBUG',
  17. redirect_stdouts=False)
  18. self.assertEqual(b.loglevel, logging.DEBUG)
  19. b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
  20. redirect_stdouts=False)
  21. self.assertEqual(b2.loglevel, logging.DEBUG)
  22. def test_colorize(self):
  23. self.app.log.setup = Mock()
  24. b = beatapp.Beat(app=self.app, no_color=True,
  25. redirect_stdouts=False)
  26. b.setup_logging()
  27. self.app.log.setup.assert_called()
  28. self.assertEqual(self.app.log.setup.call_args[1]['colorize'], False)
  29. def test_init_loader(self):
  30. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  31. b.init_loader()
  32. def test_process_title(self):
  33. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  34. b.set_process_title()
  35. def test_run(self):
  36. b = MockBeat(app=self.app, redirect_stdouts=False)
  37. b.install_sync_handler = Mock(name='beat.install_sync_handler')
  38. b.Service.return_value.max_interval = 3.0
  39. b.run()
  40. b.Service().start.assert_called_with()
  41. def psig(self, fun, *args, **kwargs):
  42. handlers = {}
  43. class Signals(platforms.Signals):
  44. def __setitem__(self, sig, handler):
  45. handlers[sig] = handler
  46. p, platforms.signals = platforms.signals, Signals()
  47. try:
  48. fun(*args, **kwargs)
  49. return handlers
  50. finally:
  51. platforms.signals = p
  52. def test_install_sync_handler(self):
  53. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  54. clock = beat.Service(app=self.app)
  55. clock.start = Mock(name='beat.Service().start')
  56. clock.sync = Mock(name='beat.Service().sync')
  57. handlers = self.psig(b.install_sync_handler, clock)
  58. with self.assertRaises(SystemExit):
  59. handlers['SIGINT']('SIGINT', object())
  60. clock.sync.assert_called_with()
  61. @mock.restore_logging()
  62. def test_setup_logging(self):
  63. try:
  64. # py3k
  65. delattr(sys.stdout, 'logger')
  66. except AttributeError:
  67. pass
  68. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  69. b.redirect_stdouts = False
  70. b.app.log.already_setup = False
  71. b.setup_logging()
  72. with self.assertRaises(AttributeError):
  73. sys.stdout.logger
  74. import sys
  75. orig_stdout = sys.__stdout__
  76. @patch('celery.apps.beat.logger')
  77. @mock.restore_logging()
  78. @mock.stdouts
  79. def test_logs_errors(self, logger, stdout, stderr):
  80. b = MockBeat(
  81. app=self.app, redirect_stdouts=False, socket_timeout=None,
  82. )
  83. b.install_sync_handler = Mock('beat.install_sync_handler')
  84. b.install_sync_handler.side_effect = RuntimeError('xxx')
  85. with self.assertRaises(RuntimeError):
  86. b.start_scheduler()
  87. logger.critical.assert_called()
  88. @patch('celery.platforms.create_pidlock')
  89. @mock.stdouts
  90. def test_using_pidfile(self, create_pidlock, stdout, stderr):
  91. b = MockBeat(app=self.app, pidfile='pidfilelockfilepid',
  92. socket_timeout=None, redirect_stdouts=False)
  93. b.install_sync_handler = Mock(name='beat.install_sync_handler')
  94. b.start_scheduler()
  95. create_pidlock.assert_called()
  96. class test_div(AppCase):
  97. def setup(self):
  98. self.Beat = self.app.Beat = self.patch('celery.apps.beat.Beat')
  99. self.detached = self.patch('celery.bin.beat.detached')
  100. self.Beat.__name__ = 'Beat'
  101. def test_main(self):
  102. sys.argv = [sys.argv[0], '-s', 'foo']
  103. beat_bin.main(app=self.app)
  104. self.Beat().run.assert_called_with()
  105. def test_detach(self):
  106. cmd = beat_bin.beat()
  107. cmd.app = self.app
  108. cmd.run(detach=True)
  109. self.detached.assert_called()
  110. def test_parse_options(self):
  111. cmd = beat_bin.beat()
  112. cmd.app = self.app
  113. options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
  114. self.assertEqual(options.schedule, 'foo')