| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 | from __future__ import absolute_import, unicode_literalsimport pytestfrom case import Mock, mock, patchfrom celery.app.events import Eventsfrom celery.events.snapshot import Polaroid, evcamclass MockTimer(object):    installed = []    def call_repeatedly(self, secs, fun, *args, **kwargs):        self.installed.append(fun)        return Mock(name='TRef')timer = MockTimer()class test_Polaroid:    def setup(self):        self.state = self.app.events.State()    def test_constructor(self):        x = Polaroid(self.state, app=self.app)        assert x.app is self.app        assert x.state is self.state        assert x.freq        assert x.cleanup_freq        assert x.logger        assert not x.maxrate    def test_install_timers(self):        x = Polaroid(self.state, app=self.app)        x.timer = timer        x.__exit__()        x.__enter__()        assert x.capture in MockTimer.installed        assert x.cleanup in MockTimer.installed        x._tref.cancel.assert_not_called()        x._ctref.cancel.assert_not_called()        x.__exit__()        x._tref.cancel.assert_called()        x._ctref.cancel.assert_called()        x._tref.assert_called()        x._ctref.assert_not_called()    def test_cleanup(self):        x = Polaroid(self.state, app=self.app)        cleanup_signal_sent = [False]        def handler(**kwargs):            cleanup_signal_sent[0] = True        x.cleanup_signal.connect(handler)        x.cleanup()        assert cleanup_signal_sent[0]    def test_shutter__capture(self):        x = Polaroid(self.state, app=self.app)        shutter_signal_sent = [False]        def handler(**kwargs):            shutter_signal_sent[0] = True        x.shutter_signal.connect(handler)        x.shutter()        assert shutter_signal_sent[0]        shutter_signal_sent[0] = False        x.capture()        assert shutter_signal_sent[0]    def test_shutter_maxrate(self):        x = Polaroid(self.state, app=self.app, maxrate='1/h')        shutter_signal_sent = [0]        def handler(**kwargs):            shutter_signal_sent[0] += 1        x.shutter_signal.connect(handler)        for i in range(30):            x.shutter()            x.shutter()            x.shutter()        assert shutter_signal_sent[0] == 1class test_evcam:    class MockReceiver(object):        raise_keyboard_interrupt = False        def capture(self, **kwargs):            if self.__class__.raise_keyboard_interrupt:                raise KeyboardInterrupt()    class MockEvents(Events):        def Receiver(self, *args, **kwargs):            return test_evcam.MockReceiver()    def setup(self):        self.app.events = self.MockEvents()        self.app.events.app = self.app    @mock.restore_logging()    def test_evcam(self):        evcam(Polaroid, timer=timer, app=self.app)        evcam(Polaroid, timer=timer, loglevel='CRITICAL', app=self.app)        self.MockReceiver.raise_keyboard_interrupt = True        try:            with pytest.raises(SystemExit):                evcam(Polaroid, timer=timer, app=self.app)        finally:            self.MockReceiver.raise_keyboard_interrupt = False    @patch('celery.platforms.create_pidlock')    def test_evcam_pidfile(self, create_pidlock):        evcam(Polaroid, timer=timer, pidfile='/var/pid', app=self.app)        create_pidlock.assert_called_with('/var/pid')
 |