test_beat.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import logging
  2. import pytest
  3. import sys
  4. from case import Mock, mock, patch
  5. from celery import beat
  6. from celery import platforms
  7. from celery.bin import beat as beat_bin
  8. from celery.apps import beat as beatapp
  9. def MockBeat(*args, **kwargs):
  10. class _Beat(beatapp.Beat):
  11. Service = Mock(
  12. name='MockBeat.Service',
  13. return_value=Mock(name='MockBeat()', max_interval=3.3),
  14. )
  15. b = _Beat(*args, **kwargs)
  16. sched = b.Service.return_value.get_scheduler = Mock()
  17. sched.return_value.max_interval = 3.3
  18. return b
  19. class test_Beat:
  20. def test_loglevel_string(self):
  21. b = beatapp.Beat(app=self.app, loglevel='DEBUG',
  22. redirect_stdouts=False)
  23. assert b.loglevel == logging.DEBUG
  24. b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
  25. redirect_stdouts=False)
  26. assert b2.loglevel == logging.DEBUG
  27. def test_colorize(self):
  28. self.app.log.setup = Mock()
  29. b = beatapp.Beat(app=self.app, no_color=True,
  30. redirect_stdouts=False)
  31. b.setup_logging()
  32. self.app.log.setup.assert_called()
  33. assert not self.app.log.setup.call_args[1]['colorize']
  34. def test_init_loader(self):
  35. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  36. b.init_loader()
  37. def test_process_title(self):
  38. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  39. b.set_process_title()
  40. def test_run(self):
  41. b = MockBeat(app=self.app, redirect_stdouts=False)
  42. b.install_sync_handler = Mock(name='beat.install_sync_handler')
  43. b.Service.return_value.max_interval = 3.0
  44. b.run()
  45. b.Service().start.assert_called_with()
  46. def psig(self, fun, *args, **kwargs):
  47. handlers = {}
  48. class Signals(platforms.Signals):
  49. def __setitem__(self, sig, handler):
  50. handlers[sig] = handler
  51. p, platforms.signals = platforms.signals, Signals()
  52. try:
  53. fun(*args, **kwargs)
  54. return handlers
  55. finally:
  56. platforms.signals = p
  57. def test_install_sync_handler(self):
  58. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  59. clock = beat.Service(app=self.app)
  60. clock.start = Mock(name='beat.Service().start')
  61. clock.sync = Mock(name='beat.Service().sync')
  62. handlers = self.psig(b.install_sync_handler, clock)
  63. with pytest.raises(SystemExit):
  64. handlers['SIGINT']('SIGINT', object())
  65. clock.sync.assert_called_with()
  66. @mock.restore_logging()
  67. def test_setup_logging(self):
  68. try:
  69. # py3k
  70. delattr(sys.stdout, 'logger')
  71. except AttributeError:
  72. pass
  73. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  74. b.redirect_stdouts = False
  75. b.app.log.already_setup = False
  76. b.setup_logging()
  77. with pytest.raises(AttributeError):
  78. sys.stdout.logger
  79. import sys
  80. orig_stdout = sys.__stdout__
  81. @patch('celery.apps.beat.logger')
  82. def test_logs_errors(self, logger):
  83. b = MockBeat(
  84. app=self.app, redirect_stdouts=False, socket_timeout=None,
  85. )
  86. b.install_sync_handler = Mock('beat.install_sync_handler')
  87. b.install_sync_handler.side_effect = RuntimeError('xxx')
  88. with mock.restore_logging():
  89. with pytest.raises(RuntimeError):
  90. b.start_scheduler()
  91. logger.critical.assert_called()
  92. @patch('celery.platforms.create_pidlock')
  93. def test_using_pidfile(self, create_pidlock):
  94. b = MockBeat(app=self.app, pidfile='pidfilelockfilepid',
  95. socket_timeout=None, redirect_stdouts=False)
  96. b.install_sync_handler = Mock(name='beat.install_sync_handler')
  97. with mock.stdouts():
  98. b.start_scheduler()
  99. create_pidlock.assert_called()
  100. class test_div:
  101. def setup(self):
  102. self.Beat = self.app.Beat = self.patching('celery.apps.beat.Beat')
  103. self.detached = self.patching('celery.bin.beat.detached')
  104. self.Beat.__name__ = 'Beat'
  105. def test_main(self):
  106. sys.argv = [sys.argv[0], '-s', 'foo']
  107. beat_bin.main(app=self.app)
  108. self.Beat().run.assert_called_with()
  109. def test_detach(self):
  110. cmd = beat_bin.beat()
  111. cmd.app = self.app
  112. cmd.run(detach=True)
  113. self.detached.assert_called()
  114. def test_parse_options(self):
  115. cmd = beat_bin.beat()
  116. cmd.app = self.app
  117. options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
  118. assert options['schedule'] == 'foo'