123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- from __future__ import absolute_import, unicode_literals
- import logging
- import sys
- import pytest
- from case import Mock, mock, patch
- from celery import beat, platforms
- from celery.apps import beat as beatapp
- from celery.bin import beat as beat_bin
- def MockBeat(*args, **kwargs):
- class _Beat(beatapp.Beat):
- Service = Mock(
- name='MockBeat.Service',
- return_value=Mock(name='MockBeat()', max_interval=3.3),
- )
- b = _Beat(*args, **kwargs)
- sched = b.Service.return_value.get_scheduler = Mock()
- sched.return_value.max_interval = 3.3
- return b
- class test_Beat:
- def test_loglevel_string(self):
- b = beatapp.Beat(app=self.app, loglevel='DEBUG',
- redirect_stdouts=False)
- assert b.loglevel == logging.DEBUG
- b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
- redirect_stdouts=False)
- assert b2.loglevel == logging.DEBUG
- def test_colorize(self):
- self.app.log.setup = Mock()
- b = beatapp.Beat(app=self.app, no_color=True,
- redirect_stdouts=False)
- b.setup_logging()
- self.app.log.setup.assert_called()
- assert not self.app.log.setup.call_args[1]['colorize']
- def test_init_loader(self):
- b = beatapp.Beat(app=self.app, redirect_stdouts=False)
- b.init_loader()
- def test_process_title(self):
- b = beatapp.Beat(app=self.app, redirect_stdouts=False)
- b.set_process_title()
- def test_run(self):
- b = MockBeat(app=self.app, redirect_stdouts=False)
- b.install_sync_handler = Mock(name='beat.install_sync_handler')
- b.Service.return_value.max_interval = 3.0
- b.run()
- b.Service().start.assert_called_with()
- def psig(self, fun, *args, **kwargs):
- handlers = {}
- class Signals(platforms.Signals):
- def __setitem__(self, sig, handler):
- handlers[sig] = handler
- p, platforms.signals = platforms.signals, Signals()
- try:
- fun(*args, **kwargs)
- return handlers
- finally:
- platforms.signals = p
- def test_install_sync_handler(self):
- b = beatapp.Beat(app=self.app, redirect_stdouts=False)
- clock = beat.Service(app=self.app)
- clock.start = Mock(name='beat.Service().start')
- clock.sync = Mock(name='beat.Service().sync')
- handlers = self.psig(b.install_sync_handler, clock)
- with pytest.raises(SystemExit):
- handlers['SIGINT']('SIGINT', object())
- clock.sync.assert_called_with()
- @mock.restore_logging()
- def test_setup_logging(self):
- try:
- # py3k
- delattr(sys.stdout, 'logger')
- except AttributeError:
- pass
- b = beatapp.Beat(app=self.app, redirect_stdouts=False)
- b.redirect_stdouts = False
- b.app.log.already_setup = False
- b.setup_logging()
- with pytest.raises(AttributeError):
- sys.stdout.logger
- import sys
- orig_stdout = sys.__stdout__
- @patch('celery.apps.beat.logger')
- def test_logs_errors(self, logger):
- b = MockBeat(
- app=self.app, redirect_stdouts=False, socket_timeout=None,
- )
- b.install_sync_handler = Mock('beat.install_sync_handler')
- b.install_sync_handler.side_effect = RuntimeError('xxx')
- with mock.restore_logging():
- with pytest.raises(RuntimeError):
- b.start_scheduler()
- logger.critical.assert_called()
- @patch('celery.platforms.create_pidlock')
- def test_using_pidfile(self, create_pidlock):
- b = MockBeat(app=self.app, pidfile='pidfilelockfilepid',
- socket_timeout=None, redirect_stdouts=False)
- b.install_sync_handler = Mock(name='beat.install_sync_handler')
- with mock.stdouts():
- b.start_scheduler()
- create_pidlock.assert_called()
- class test_div:
- def setup(self):
- self.Beat = self.app.Beat = self.patching('celery.apps.beat.Beat')
- self.detached = self.patching('celery.bin.beat.detached')
- self.Beat.__name__ = 'Beat'
- def test_main(self):
- sys.argv = [sys.argv[0], '-s', 'foo']
- beat_bin.main(app=self.app)
- self.Beat().run.assert_called_with()
- def test_detach(self):
- cmd = beat_bin.beat()
- cmd.app = self.app
- cmd.run(detach=True)
- self.detached.assert_called()
- def test_parse_options(self):
- cmd = beat_bin.beat()
- cmd.app = self.app
- options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
- assert options['schedule'] == 'foo'
|