test_beat.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. from __future__ import absolute_import, unicode_literals
  2. import logging
  3. import pytest
  4. import sys
  5. from case import Mock, mock, patch
  6. from celery import beat
  7. from celery import platforms
  8. from celery.bin import beat as beat_bin
  9. from celery.apps import beat as beatapp
  10. def MockBeat(*args, **kwargs):
  11. class _Beat(beatapp.Beat):
  12. Service = Mock(
  13. name='MockBeat.Service',
  14. return_value=Mock(name='MockBeat()', max_interval=3.3),
  15. )
  16. b = _Beat(*args, **kwargs)
  17. sched = b.Service.return_value.get_scheduler = Mock()
  18. sched.return_value.max_interval = 3.3
  19. return b
  20. class test_Beat:
  21. def test_loglevel_string(self):
  22. b = beatapp.Beat(app=self.app, loglevel='DEBUG',
  23. redirect_stdouts=False)
  24. assert b.loglevel == logging.DEBUG
  25. b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
  26. redirect_stdouts=False)
  27. assert b2.loglevel == logging.DEBUG
  28. def test_colorize(self):
  29. self.app.log.setup = Mock()
  30. b = beatapp.Beat(app=self.app, no_color=True,
  31. redirect_stdouts=False)
  32. b.setup_logging()
  33. self.app.log.setup.assert_called()
  34. assert not self.app.log.setup.call_args[1]['colorize']
  35. def test_init_loader(self):
  36. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  37. b.init_loader()
  38. def test_process_title(self):
  39. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  40. b.set_process_title()
  41. def test_run(self):
  42. b = MockBeat(app=self.app, redirect_stdouts=False)
  43. b.install_sync_handler = Mock(name='beat.install_sync_handler')
  44. b.Service.return_value.max_interval = 3.0
  45. b.run()
  46. b.Service().start.assert_called_with()
  47. def psig(self, fun, *args, **kwargs):
  48. handlers = {}
  49. class Signals(platforms.Signals):
  50. def __setitem__(self, sig, handler):
  51. handlers[sig] = handler
  52. p, platforms.signals = platforms.signals, Signals()
  53. try:
  54. fun(*args, **kwargs)
  55. return handlers
  56. finally:
  57. platforms.signals = p
  58. def test_install_sync_handler(self):
  59. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  60. clock = beat.Service(app=self.app)
  61. clock.start = Mock(name='beat.Service().start')
  62. clock.sync = Mock(name='beat.Service().sync')
  63. handlers = self.psig(b.install_sync_handler, clock)
  64. with pytest.raises(SystemExit):
  65. handlers['SIGINT']('SIGINT', object())
  66. clock.sync.assert_called_with()
  67. @mock.restore_logging()
  68. def test_setup_logging(self):
  69. try:
  70. # py3k
  71. delattr(sys.stdout, 'logger')
  72. except AttributeError:
  73. pass
  74. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  75. b.redirect_stdouts = False
  76. b.app.log.already_setup = False
  77. b.setup_logging()
  78. with pytest.raises(AttributeError):
  79. sys.stdout.logger
  80. import sys
  81. orig_stdout = sys.__stdout__
  82. @patch('celery.apps.beat.logger')
  83. def test_logs_errors(self, logger):
  84. b = MockBeat(
  85. app=self.app, redirect_stdouts=False, socket_timeout=None,
  86. )
  87. b.install_sync_handler = Mock('beat.install_sync_handler')
  88. b.install_sync_handler.side_effect = RuntimeError('xxx')
  89. with mock.restore_logging():
  90. with pytest.raises(RuntimeError):
  91. b.start_scheduler()
  92. logger.critical.assert_called()
  93. @patch('celery.platforms.create_pidlock')
  94. def test_using_pidfile(self, create_pidlock):
  95. b = MockBeat(app=self.app, pidfile='pidfilelockfilepid',
  96. socket_timeout=None, redirect_stdouts=False)
  97. b.install_sync_handler = Mock(name='beat.install_sync_handler')
  98. with mock.stdouts():
  99. b.start_scheduler()
  100. create_pidlock.assert_called()
  101. class test_div:
  102. def setup(self):
  103. self.Beat = self.app.Beat = self.patching('celery.apps.beat.Beat')
  104. self.detached = self.patching('celery.bin.beat.detached')
  105. self.Beat.__name__ = 'Beat'
  106. def test_main(self):
  107. sys.argv = [sys.argv[0], '-s', 'foo']
  108. beat_bin.main(app=self.app)
  109. self.Beat().run.assert_called_with()
  110. def test_detach(self):
  111. cmd = beat_bin.beat()
  112. cmd.app = self.app
  113. cmd.run(detach=True)
  114. self.detached.assert_called()
  115. def test_parse_options(self):
  116. cmd = beat_bin.beat()
  117. cmd.app = self.app
  118. options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
  119. assert options['schedule'] == 'foo'