| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539 | from __future__ import absolute_import, unicode_literalsimport errnoimport pytestfrom datetime import datetime, timedeltafrom pickle import dumps, loadsfrom case import Mock, call, patch, skipfrom celery import beatfrom celery import uuidfrom celery.five import keys, string_tfrom celery.schedules import schedulefrom celery.utils.objects import Bunchclass MockShelve(dict):    closed = False    synced = False    def close(self):        self.closed = True    def sync(self):        self.synced = Trueclass MockService(object):    started = False    stopped = False    def __init__(self, *args, **kwargs):        pass    def start(self, **kwargs):        self.started = True    def stop(self, **kwargs):        self.stopped = Trueclass test_ScheduleEntry:    Entry = beat.ScheduleEntry    def create_entry(self, **kwargs):        entry = dict(            name='celery.unittest.add',            schedule=timedelta(seconds=10),            args=(2, 2),            options={'routing_key': 'cpu'},            app=self.app,        )        return self.Entry(**dict(entry, **kwargs))    def test_next(self):        entry = self.create_entry(schedule=10)        assert entry.last_run_at        assert isinstance(entry.last_run_at, datetime)        assert entry.total_run_count == 0        next_run_at = entry.last_run_at + timedelta(seconds=10)        next_entry = entry.next(next_run_at)        assert next_entry.last_run_at >= next_run_at        assert next_entry.total_run_count == 1    def test_is_due(self):        entry = self.create_entry(schedule=timedelta(seconds=10))        assert entry.app is self.app        assert entry.schedule.app is self.app        due1, next_time_to_run1 = entry.is_due()        assert not due1        assert next_time_to_run1 > 9        next_run_at = entry.last_run_at - timedelta(seconds=10)        next_entry = entry.next(next_run_at)        due2, next_time_to_run2 = next_entry.is_due()        assert due2        assert next_time_to_run2 > 9    def test_repr(self):        entry = self.create_entry()        assert '<ScheduleEntry:' in repr(entry)    def test_reduce(self):        entry = self.create_entry(schedule=timedelta(seconds=10))        fun, args = entry.__reduce__()        res = fun(*args)        assert res.schedule == entry.schedule    def test_lt(self):        e1 = self.create_entry(schedule=timedelta(seconds=10))        e2 = self.create_entry(schedule=timedelta(seconds=2))        # order doesn't matter, see comment in __lt__        res1 = e1 < e2  # noqa        try:            res2 = e1 < object()  # noqa        except TypeError:            pass    def test_update(self):        entry = self.create_entry()        assert entry.schedule == timedelta(seconds=10)        assert entry.args == (2, 2)        assert entry.kwargs == {}        assert entry.options == {'routing_key': 'cpu'}        entry2 = self.create_entry(schedule=timedelta(minutes=20),                                   args=(16, 16),                                   kwargs={'callback': 'foo.bar.baz'},                                   options={'routing_key': 'urgent'})        entry.update(entry2)        assert entry.schedule == schedule(timedelta(minutes=20))        assert entry.args == (16, 16)        assert entry.kwargs == {'callback': 'foo.bar.baz'}        assert entry.options == {'routing_key': 'urgent'}class mScheduler(beat.Scheduler):    def __init__(self, *args, **kwargs):        self.sent = []        beat.Scheduler.__init__(self, *args, **kwargs)    def send_task(self, name=None, args=None, kwargs=None, **options):        self.sent.append({'name': name,                          'args': args,                          'kwargs': kwargs,                          'options': options})        return self.app.AsyncResult(uuid())class mSchedulerSchedulingError(mScheduler):    def send_task(self, *args, **kwargs):        raise beat.SchedulingError('Could not apply task')class mSchedulerRuntimeError(mScheduler):    def is_due(self, *args, **kwargs):        raise RuntimeError('dict modified while itervalues')class mocked_schedule(schedule):    def __init__(self, is_due, next_run_at):        self._is_due = is_due        self._next_run_at = next_run_at        self.run_every = timedelta(seconds=1)        self.nowfun = datetime.utcnow    def is_due(self, last_run_at):        return self._is_due, self._next_run_atalways_due = mocked_schedule(True, 1)always_pending = mocked_schedule(False, 1)class test_Scheduler:    def test_custom_schedule_dict(self):        custom = {'foo': 'bar'}        scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)        assert scheduler.data is custom    def test_apply_async_uses_registered_task_instances(self):        @self.app.task(shared=False)        def foo():            pass        foo.apply_async = Mock(name='foo.apply_async')        assert foo.name in foo._get_app().tasks        scheduler = mScheduler(app=self.app)        scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))        foo.apply_async.assert_called()    def test_should_sync(self):        @self.app.task(shared=False)        def not_sync():            pass        not_sync.apply_async = Mock()        s = mScheduler(app=self.app)        s._do_sync = Mock()        s.should_sync = Mock()        s.should_sync.return_value = True        s.apply_async(s.Entry(task=not_sync.name, app=self.app))        s._do_sync.assert_called_with()        s._do_sync = Mock()        s.should_sync.return_value = False        s.apply_async(s.Entry(task=not_sync.name, app=self.app))        s._do_sync.assert_not_called()    def test_should_sync_increments_sync_every_counter(self):        self.app.conf.beat_sync_every = 2        @self.app.task(shared=False)        def not_sync():            pass        not_sync.apply_async = Mock()        s = mScheduler(app=self.app)        assert s.sync_every_tasks == 2        s._do_sync = Mock()        s.apply_async(s.Entry(task=not_sync.name, app=self.app))        assert s._tasks_since_sync == 1        s.apply_async(s.Entry(task=not_sync.name, app=self.app))        s._do_sync.assert_called_with()        self.app.conf.beat_sync_every = 0    def test_sync_task_counter_resets_on_do_sync(self):        self.app.conf.beat_sync_every = 1        @self.app.task(shared=False)        def not_sync():            pass        not_sync.apply_async = Mock()        s = mScheduler(app=self.app)        assert s.sync_every_tasks == 1        s.apply_async(s.Entry(task=not_sync.name, app=self.app))        assert s._tasks_since_sync == 0        self.app.conf.beat_sync_every = 0    @patch('celery.app.base.Celery.send_task')    def test_send_task(self, send_task):        b = beat.Scheduler(app=self.app)        b.send_task('tasks.add', countdown=10)        send_task.assert_called_with('tasks.add', countdown=10)    def test_info(self):        scheduler = mScheduler(app=self.app)        assert isinstance(scheduler.info, string_t)    def test_maybe_entry(self):        s = mScheduler(app=self.app)        entry = s.Entry(name='add every', task='tasks.add', app=self.app)        assert s._maybe_entry(entry.name, entry) is entry        assert s._maybe_entry('add every', {'task': 'tasks.add'})    def test_set_schedule(self):        s = mScheduler(app=self.app)        s.schedule = {'foo': 'bar'}        assert s.data == {'foo': 'bar'}    @patch('kombu.connection.Connection.ensure_connection')    def test_ensure_connection_error_handler(self, ensure):        s = mScheduler(app=self.app)        assert s._ensure_connected()        ensure.assert_called()        callback = ensure.call_args[0][0]        callback(KeyError(), 5)    def test_install_default_entries(self):        self.app.conf.result_expires = None        self.app.conf.beat_schedule = {}        s = mScheduler(app=self.app)        s.install_default_entries({})        assert 'celery.backend_cleanup' not in s.data        self.app.backend.supports_autoexpire = False        self.app.conf.result_expires = 30        s = mScheduler(app=self.app)        s.install_default_entries({})        assert 'celery.backend_cleanup' in s.data        self.app.backend.supports_autoexpire = True        self.app.conf.result_expires = 31        s = mScheduler(app=self.app)        s.install_default_entries({})        assert 'celery.backend_cleanup' not in s.data    def test_due_tick(self):        scheduler = mScheduler(app=self.app)        scheduler.add(name='test_due_tick',                      schedule=always_due,                      args=(1, 2),                      kwargs={'foo': 'bar'})        assert scheduler.tick() == 0    @patch('celery.beat.error')    def test_due_tick_SchedulingError(self, error):        scheduler = mSchedulerSchedulingError(app=self.app)        scheduler.add(name='test_due_tick_SchedulingError',                      schedule=always_due)        assert scheduler.tick() == 0        error.assert_called()    def test_pending_tick(self):        scheduler = mScheduler(app=self.app)        scheduler.add(name='test_pending_tick',                      schedule=always_pending)        assert scheduler.tick() == 1 - 0.010    def test_honors_max_interval(self):        scheduler = mScheduler(app=self.app)        maxi = scheduler.max_interval        scheduler.add(name='test_honors_max_interval',                      schedule=mocked_schedule(False, maxi * 4))        assert scheduler.tick() == maxi    def test_ticks(self):        scheduler = mScheduler(app=self.app)        nums = [600, 300, 650, 120, 250, 36]        s = dict(('test_ticks%s' % i,                 {'schedule': mocked_schedule(False, j)})                 for i, j in enumerate(nums))        scheduler.update_from_dict(s)        assert scheduler.tick() == min(nums) - 0.010    def test_schedule_no_remain(self):        scheduler = mScheduler(app=self.app)        scheduler.add(name='test_schedule_no_remain',                      schedule=mocked_schedule(False, None))        assert scheduler.tick() == scheduler.max_interval    def test_interface(self):        scheduler = mScheduler(app=self.app)        scheduler.sync()        scheduler.setup_schedule()        scheduler.close()    def test_merge_inplace(self):        a = mScheduler(app=self.app)        b = mScheduler(app=self.app)        a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},                            'bar': {'schedule': mocked_schedule(True, 20)}})        b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},                            'baz': {'schedule': mocked_schedule(True, 10)}})        a.merge_inplace(b.schedule)        assert 'foo' not in a.schedule        assert 'baz' in a.schedule        assert a.schedule['bar'].schedule._next_run_at == 40def create_persistent_scheduler(shelv=None):    if shelv is None:        shelv = MockShelve()    class MockPersistentScheduler(beat.PersistentScheduler):        sh = shelv        persistence = Bunch(            open=lambda *a, **kw: shelv,        )        tick_raises_exit = False        shutdown_service = None        def tick(self):            if self.tick_raises_exit:                raise SystemExit()            if self.shutdown_service:                self.shutdown_service._is_shutdown.set()            return 0.0    return MockPersistentScheduler, shelvclass test_PersistentScheduler:    @patch('os.remove')    def test_remove_db(self, remove):        s = create_persistent_scheduler()[0](app=self.app,                                             schedule_filename='schedule')        s._remove_db()        remove.assert_has_calls(            [call('schedule' + suffix) for suffix in s.known_suffixes]        )        err = OSError()        err.errno = errno.ENOENT        remove.side_effect = err        s._remove_db()        err.errno = errno.EPERM        with pytest.raises(OSError):            s._remove_db()    def test_setup_schedule(self):        s = create_persistent_scheduler()[0](app=self.app,                                             schedule_filename='schedule')        opens = s.persistence.open = Mock()        s._remove_db = Mock()        def effect(*args, **kwargs):            if opens.call_count > 1:                return s.sh            raise OSError()        opens.side_effect = effect        s.setup_schedule()        s._remove_db.assert_called_with()        s._store = {str('__version__'): 1}        s.setup_schedule()        s._store.clear = Mock()        op = s.persistence.open = Mock()        op.return_value = s._store        s._store[str('tz')] = 'FUNKY'        s.setup_schedule()        op.assert_called_with(s.schedule_filename, writeback=True)        s._store.clear.assert_called_with()        s._store[str('utc_enabled')] = False        s._store.clear = Mock()        s.setup_schedule()        s._store.clear.assert_called_with()    def test_get_schedule(self):        s = create_persistent_scheduler()[0](            schedule_filename='schedule', app=self.app,        )        s._store = {str('entries'): {}}        s.schedule = {'foo': 'bar'}        assert s.schedule == {'foo': 'bar'}        assert s._store[str('entries')] == s.scheduleclass test_Service:    def get_service(self):        Scheduler, mock_shelve = create_persistent_scheduler()        return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve    def test_pickleable(self):        s = beat.Service(app=self.app, scheduler_cls=Mock)        assert loads(dumps(s))    def test_start(self):        s, sh = self.get_service()        schedule = s.scheduler.schedule        assert isinstance(schedule, dict)        assert isinstance(s.scheduler, beat.Scheduler)        scheduled = list(schedule.keys())        for task_name in keys(sh[str('entries')]):            assert task_name in scheduled        s.sync()        assert sh.closed        assert sh.synced        assert s._is_stopped.isSet()        s.sync()        s.stop(wait=False)        assert s._is_shutdown.isSet()        s.stop(wait=True)        assert s._is_shutdown.isSet()        p = s.scheduler._store        s.scheduler._store = None        try:            s.scheduler.sync()        finally:            s.scheduler._store = p    def test_start_embedded_process(self):        s, sh = self.get_service()        s._is_shutdown.set()        s.start(embedded_process=True)    def test_start_thread(self):        s, sh = self.get_service()        s._is_shutdown.set()        s.start(embedded_process=False)    def test_start_tick_raises_exit_error(self):        s, sh = self.get_service()        s.scheduler.tick_raises_exit = True        s.start()        assert s._is_shutdown.isSet()    def test_start_manages_one_tick_before_shutdown(self):        s, sh = self.get_service()        s.scheduler.shutdown_service = s        s.start()        assert s._is_shutdown.isSet()class test_EmbeddedService:    @skip.unless_module('_multiprocessing', name='multiprocessing')    def xxx_start_stop_process(self):        from billiard.process import Process        s = beat.EmbeddedService(self.app)        assert isinstance(s, Process)        assert isinstance(s.service, beat.Service)        s.service = MockService()        class _Popen(object):            terminated = False            def terminate(self):                self.terminated = True        with patch('celery.platforms.close_open_fds'):            s.run()        assert s.service.started        s._popen = _Popen()        s.stop()        assert s.service.stopped        assert s._popen.terminated    def test_start_stop_threaded(self):        s = beat.EmbeddedService(self.app, thread=True)        from threading import Thread        assert isinstance(s, Thread)        assert isinstance(s.service, beat.Service)        s.service = MockService()        s.run()        assert s.service.started        s.stop()        assert s.service.stoppedclass test_schedule:    def test_maybe_make_aware(self):        x = schedule(10, app=self.app)        x.utc_enabled = True        d = x.maybe_make_aware(datetime.utcnow())        assert d.tzinfo        x.utc_enabled = False        d2 = x.maybe_make_aware(datetime.utcnow())        assert d2.tzinfo    def test_to_local(self):        x = schedule(10, app=self.app)        x.utc_enabled = True        d = x.to_local(datetime.utcnow())        assert d.tzinfo is None        x.utc_enabled = False        d = x.to_local(datetime.utcnow())        assert d.tzinfo
 |