| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737 | 
							- from __future__ import absolute_import, unicode_literals
 
- import errno
 
- from datetime import datetime, timedelta
 
- from pickle import dumps, loads
 
- import pytest
 
- from case import Mock, call, patch, skip
 
- from celery import __version__, beat, uuid
 
- from celery.beat import event_t
 
- from celery.five import keys, string_t
 
- from celery.schedules import crontab, schedule
 
- from celery.utils.objects import Bunch
 
- class MockShelve(dict):
 
-     closed = False
 
-     synced = False
 
-     def close(self):
 
-         self.closed = True
 
-     def sync(self):
 
-         self.synced = True
 
- class MockService(object):
 
-     started = False
 
-     stopped = False
 
-     def __init__(self, *args, **kwargs):
 
-         pass
 
-     def start(self, **kwargs):
 
-         self.started = True
 
-     def stop(self, **kwargs):
 
-         self.stopped = True
 
- class test_ScheduleEntry:
 
-     Entry = beat.ScheduleEntry
 
-     def create_entry(self, **kwargs):
 
-         entry = {
 
-             'name': 'celery.unittest.add',
 
-             'schedule': timedelta(seconds=10),
 
-             'args': (2, 2),
 
-             'options': {'routing_key': 'cpu'},
 
-             'app': self.app,
 
-         }
 
-         return self.Entry(**dict(entry, **kwargs))
 
-     def test_next(self):
 
-         entry = self.create_entry(schedule=10)
 
-         assert entry.last_run_at
 
-         assert isinstance(entry.last_run_at, datetime)
 
-         assert entry.total_run_count == 0
 
-         next_run_at = entry.last_run_at + timedelta(seconds=10)
 
-         next_entry = entry.next(next_run_at)
 
-         assert next_entry.last_run_at >= next_run_at
 
-         assert next_entry.total_run_count == 1
 
-     def test_is_due(self):
 
-         entry = self.create_entry(schedule=timedelta(seconds=10))
 
-         assert entry.app is self.app
 
-         assert entry.schedule.app is self.app
 
-         due1, next_time_to_run1 = entry.is_due()
 
-         assert not due1
 
-         assert next_time_to_run1 > 9
 
-         next_run_at = entry.last_run_at - timedelta(seconds=10)
 
-         next_entry = entry.next(next_run_at)
 
-         due2, next_time_to_run2 = next_entry.is_due()
 
-         assert due2
 
-         assert next_time_to_run2 > 9
 
-     def test_repr(self):
 
-         entry = self.create_entry()
 
-         assert '<ScheduleEntry:' in repr(entry)
 
-     def test_reduce(self):
 
-         entry = self.create_entry(schedule=timedelta(seconds=10))
 
-         fun, args = entry.__reduce__()
 
-         res = fun(*args)
 
-         assert res.schedule == entry.schedule
 
-     def test_lt(self):
 
-         e1 = self.create_entry(schedule=timedelta(seconds=10))
 
-         e2 = self.create_entry(schedule=timedelta(seconds=2))
 
-         # order doesn't matter, see comment in __lt__
 
-         res1 = e1 < e2  # noqa
 
-         try:
 
-             res2 = e1 < object()  # noqa
 
-         except TypeError:
 
-             pass
 
-     def test_update(self):
 
-         entry = self.create_entry()
 
-         assert entry.schedule == timedelta(seconds=10)
 
-         assert entry.args == (2, 2)
 
-         assert entry.kwargs == {}
 
-         assert entry.options == {'routing_key': 'cpu'}
 
-         entry2 = self.create_entry(schedule=timedelta(minutes=20),
 
-                                    args=(16, 16),
 
-                                    kwargs={'callback': 'foo.bar.baz'},
 
-                                    options={'routing_key': 'urgent'})
 
-         entry.update(entry2)
 
-         assert entry.schedule == schedule(timedelta(minutes=20))
 
-         assert entry.args == (16, 16)
 
-         assert entry.kwargs == {'callback': 'foo.bar.baz'}
 
-         assert entry.options == {'routing_key': 'urgent'}
 
- class mScheduler(beat.Scheduler):
 
-     def __init__(self, *args, **kwargs):
 
-         self.sent = []
 
-         beat.Scheduler.__init__(self, *args, **kwargs)
 
-     def send_task(self, name=None, args=None, kwargs=None, **options):
 
-         self.sent.append({'name': name,
 
-                           'args': args,
 
-                           'kwargs': kwargs,
 
-                           'options': options})
 
-         return self.app.AsyncResult(uuid())
 
- class mSchedulerSchedulingError(mScheduler):
 
-     def send_task(self, *args, **kwargs):
 
-         raise beat.SchedulingError('Could not apply task')
 
- class mSchedulerRuntimeError(mScheduler):
 
-     def is_due(self, *args, **kwargs):
 
-         raise RuntimeError('dict modified while itervalues')
 
- class mocked_schedule(schedule):
 
-     def __init__(self, is_due, next_run_at):
 
-         self._is_due = is_due
 
-         self._next_run_at = next_run_at
 
-         self.run_every = timedelta(seconds=1)
 
-         self.nowfun = datetime.utcnow
 
-     def is_due(self, last_run_at):
 
-         return self._is_due, self._next_run_at
 
- always_due = mocked_schedule(True, 1)
 
- always_pending = mocked_schedule(False, 1)
 
- class test_Scheduler:
 
-     def test_custom_schedule_dict(self):
 
-         custom = {'foo': 'bar'}
 
-         scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
 
-         assert scheduler.data is custom
 
-     def test_apply_async_uses_registered_task_instances(self):
 
-         @self.app.task(shared=False)
 
-         def foo():
 
-             pass
 
-         foo.apply_async = Mock(name='foo.apply_async')
 
-         assert foo.name in foo._get_app().tasks
 
-         scheduler = mScheduler(app=self.app)
 
-         scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
 
-         foo.apply_async.assert_called()
 
-     def test_should_sync(self):
 
-         @self.app.task(shared=False)
 
-         def not_sync():
 
-             pass
 
-         not_sync.apply_async = Mock()
 
-         s = mScheduler(app=self.app)
 
-         s._do_sync = Mock()
 
-         s.should_sync = Mock()
 
-         s.should_sync.return_value = True
 
-         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
 
-         s._do_sync.assert_called_with()
 
-         s._do_sync = Mock()
 
-         s.should_sync.return_value = False
 
-         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
 
-         s._do_sync.assert_not_called()
 
-     def test_should_sync_increments_sync_every_counter(self):
 
-         self.app.conf.beat_sync_every = 2
 
-         @self.app.task(shared=False)
 
-         def not_sync():
 
-             pass
 
-         not_sync.apply_async = Mock()
 
-         s = mScheduler(app=self.app)
 
-         assert s.sync_every_tasks == 2
 
-         s._do_sync = Mock()
 
-         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
 
-         assert s._tasks_since_sync == 1
 
-         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
 
-         s._do_sync.assert_called_with()
 
-         self.app.conf.beat_sync_every = 0
 
-     def test_sync_task_counter_resets_on_do_sync(self):
 
-         self.app.conf.beat_sync_every = 1
 
-         @self.app.task(shared=False)
 
-         def not_sync():
 
-             pass
 
-         not_sync.apply_async = Mock()
 
-         s = mScheduler(app=self.app)
 
-         assert s.sync_every_tasks == 1
 
-         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
 
-         assert s._tasks_since_sync == 0
 
-         self.app.conf.beat_sync_every = 0
 
-     @patch('celery.app.base.Celery.send_task')
 
-     def test_send_task(self, send_task):
 
-         b = beat.Scheduler(app=self.app)
 
-         b.send_task('tasks.add', countdown=10)
 
-         send_task.assert_called_with('tasks.add', countdown=10)
 
-     def test_info(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         assert isinstance(scheduler.info, string_t)
 
-     def test_maybe_entry(self):
 
-         s = mScheduler(app=self.app)
 
-         entry = s.Entry(name='add every', task='tasks.add', app=self.app)
 
-         assert s._maybe_entry(entry.name, entry) is entry
 
-         assert s._maybe_entry('add every', {'task': 'tasks.add'})
 
-     def test_set_schedule(self):
 
-         s = mScheduler(app=self.app)
 
-         s.schedule = {'foo': 'bar'}
 
-         assert s.data == {'foo': 'bar'}
 
-     @patch('kombu.connection.Connection.ensure_connection')
 
-     def test_ensure_connection_error_handler(self, ensure):
 
-         s = mScheduler(app=self.app)
 
-         assert s._ensure_connected()
 
-         ensure.assert_called()
 
-         callback = ensure.call_args[0][0]
 
-         callback(KeyError(), 5)
 
-     def test_install_default_entries(self):
 
-         self.app.conf.result_expires = None
 
-         self.app.conf.beat_schedule = {}
 
-         s = mScheduler(app=self.app)
 
-         s.install_default_entries({})
 
-         assert 'celery.backend_cleanup' not in s.data
 
-         self.app.backend.supports_autoexpire = False
 
-         self.app.conf.result_expires = 30
 
-         s = mScheduler(app=self.app)
 
-         s.install_default_entries({})
 
-         assert 'celery.backend_cleanup' in s.data
 
-         self.app.backend.supports_autoexpire = True
 
-         self.app.conf.result_expires = 31
 
-         s = mScheduler(app=self.app)
 
-         s.install_default_entries({})
 
-         assert 'celery.backend_cleanup' not in s.data
 
-     def test_due_tick(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         scheduler.add(name='test_due_tick',
 
-                       schedule=always_due,
 
-                       args=(1, 2),
 
-                       kwargs={'foo': 'bar'})
 
-         assert scheduler.tick() == 0
 
-     @patch('celery.beat.error')
 
-     def test_due_tick_SchedulingError(self, error):
 
-         scheduler = mSchedulerSchedulingError(app=self.app)
 
-         scheduler.add(name='test_due_tick_SchedulingError',
 
-                       schedule=always_due)
 
-         assert scheduler.tick() == 0
 
-         error.assert_called()
 
-     def test_pending_tick(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         scheduler.add(name='test_pending_tick',
 
-                       schedule=always_pending)
 
-         assert scheduler.tick() == 1 - 0.010
 
-     def test_honors_max_interval(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         maxi = scheduler.max_interval
 
-         scheduler.add(name='test_honors_max_interval',
 
-                       schedule=mocked_schedule(False, maxi * 4))
 
-         assert scheduler.tick() == maxi
 
-     def test_ticks(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         nums = [600, 300, 650, 120, 250, 36]
 
-         s = {'test_ticks%s' % i: {'schedule': mocked_schedule(False, j)}
 
-              for i, j in enumerate(nums)}
 
-         scheduler.update_from_dict(s)
 
-         assert scheduler.tick() == min(nums) - 0.010
 
-     def test_ticks_schedule_change(self):
 
-         # initialise schedule and check heap is not initialized
 
-         scheduler = mScheduler(app=self.app)
 
-         assert scheduler._heap is None
 
-         # set initial schedule and check heap is updated
 
-         schedule_5 = schedule(5)
 
-         scheduler.add(name='test_schedule', schedule=schedule_5)
 
-         scheduler.tick()
 
-         assert scheduler._heap[0].entry.schedule == schedule_5
 
-         # update schedule and check heap is updated
 
-         schedule_10 = schedule(10)
 
-         scheduler.add(name='test_schedule', schedule=schedule(10))
 
-         scheduler.tick()
 
-         assert scheduler._heap[0].entry.schedule == schedule_10
 
-     def test_schedule_no_remain(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         scheduler.add(name='test_schedule_no_remain',
 
-                       schedule=mocked_schedule(False, None))
 
-         assert scheduler.tick() == scheduler.max_interval
 
-     def test_interface(self):
 
-         scheduler = mScheduler(app=self.app)
 
-         scheduler.sync()
 
-         scheduler.setup_schedule()
 
-         scheduler.close()
 
-     def test_merge_inplace(self):
 
-         a = mScheduler(app=self.app)
 
-         b = mScheduler(app=self.app)
 
-         a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
 
-                             'bar': {'schedule': mocked_schedule(True, 20)}})
 
-         b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
 
-                             'baz': {'schedule': mocked_schedule(True, 10)}})
 
-         a.merge_inplace(b.schedule)
 
-         assert 'foo' not in a.schedule
 
-         assert 'baz' in a.schedule
 
-         assert a.schedule['bar'].schedule._next_run_at == 40
 
-     @patch('celery.beat.Scheduler._when', return_value=1)
 
-     def test_populate_heap(self, _when):
 
-         scheduler = mScheduler(app=self.app)
 
-         scheduler.update_from_dict(
 
-             {'foo': {'schedule': mocked_schedule(True, 10)}}
 
-         )
 
-         scheduler.populate_heap()
 
-         assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]
 
-     def create_schedule_entry(self, schedule=None, args=(), kwargs={},
 
-                               options={}, task=None):
 
-         entry = {
 
-             'name': 'celery.unittest.add',
 
-             'schedule': schedule,
 
-             'app': self.app,
 
-             'args': args,
 
-             'kwargs': kwargs,
 
-             'options': options,
 
-             'task': task
 
-         }
 
-         return beat.ScheduleEntry(**dict(entry))
 
-     def test_schedule_equal_schedule_vs_schedule_success(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(schedule=schedule(5))}
 
-         b = {'a': self.create_schedule_entry(schedule=schedule(5))}
 
-         assert scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_schedule_vs_schedule_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(schedule=schedule(5))}
 
-         b = {'a': self.create_schedule_entry(schedule=schedule(10))}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_crontab_vs_crontab_success(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
 
-         b = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
 
-         assert scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_crontab_vs_crontab_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
 
-         b = {'a': self.create_schedule_entry(schedule=crontab(minute=10))}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_crontab_vs_schedule_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
 
-         b = {'a': self.create_schedule_entry(schedule=schedule(5))}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_different_key_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(schedule=schedule(5))}
 
-         b = {'b': self.create_schedule_entry(schedule=schedule(5))}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_args_vs_args_success(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(args='a')}
 
-         b = {'a': self.create_schedule_entry(args='a')}
 
-         assert scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_args_vs_args_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(args='a')}
 
-         b = {'a': self.create_schedule_entry(args='b')}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_kwargs_vs_kwargs_success(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(kwargs={'a': 'a'})}
 
-         b = {'a': self.create_schedule_entry(kwargs={'a': 'a'})}
 
-         assert scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_kwargs_vs_kwargs_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(kwargs={'a': 'a'})}
 
-         b = {'a': self.create_schedule_entry(kwargs={'b': 'b'})}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_options_vs_options_success(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(options={'a': 'a'})}
 
-         b = {'a': self.create_schedule_entry(options={'a': 'a'})}
 
-         assert scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_options_vs_options_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(options={'a': 'a'})}
 
-         b = {'a': self.create_schedule_entry(options={'b': 'b'})}
 
-         assert not scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_task_vs_task_success(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(task='a')}
 
-         b = {'a': self.create_schedule_entry(task='a')}
 
-         assert scheduler.schedules_equal(a, b)
 
-     def test_schedule_equal_task_vs_task_fail(self):
 
-         scheduler = beat.Scheduler(app=self.app)
 
-         a = {'a': self.create_schedule_entry(task='a')}
 
-         b = {'a': self.create_schedule_entry(task='b')}
 
-         assert not scheduler.schedules_equal(a, b)
 
- def create_persistent_scheduler(shelv=None):
 
-     if shelv is None:
 
-         shelv = MockShelve()
 
-     class MockPersistentScheduler(beat.PersistentScheduler):
 
-         sh = shelv
 
-         persistence = Bunch(
 
-             open=lambda *a, **kw: shelv,
 
-         )
 
-         tick_raises_exit = False
 
-         shutdown_service = None
 
-         def tick(self):
 
-             if self.tick_raises_exit:
 
-                 raise SystemExit()
 
-             if self.shutdown_service:
 
-                 self.shutdown_service._is_shutdown.set()
 
-             return 0.0
 
-     return MockPersistentScheduler, shelv
 
- def create_persistent_scheduler_w_call_logging(shelv=None):
 
-     if shelv is None:
 
-         shelv = MockShelve()
 
-     class MockPersistentScheduler(beat.PersistentScheduler):
 
-         sh = shelv
 
-         persistence = Bunch(
 
-             open=lambda *a, **kw: shelv,
 
-         )
 
-         def __init__(self, *args, **kwargs):
 
-             self.sent = []
 
-             beat.PersistentScheduler.__init__(self, *args, **kwargs)
 
-         def send_task(self, task=None, args=None, kwargs=None, **options):
 
-             self.sent.append({'task': task,
 
-                               'args': args,
 
-                               'kwargs': kwargs,
 
-                               'options': options})
 
-             return self.app.AsyncResult(uuid())
 
-     return MockPersistentScheduler, shelv
 
- class test_PersistentScheduler:
 
-     @patch('os.remove')
 
-     def test_remove_db(self, remove):
 
-         s = create_persistent_scheduler()[0](app=self.app,
 
-                                              schedule_filename='schedule')
 
-         s._remove_db()
 
-         remove.assert_has_calls(
 
-             [call('schedule' + suffix) for suffix in s.known_suffixes]
 
-         )
 
-         err = OSError()
 
-         err.errno = errno.ENOENT
 
-         remove.side_effect = err
 
-         s._remove_db()
 
-         err.errno = errno.EPERM
 
-         with pytest.raises(OSError):
 
-             s._remove_db()
 
-     def test_setup_schedule(self):
 
-         s = create_persistent_scheduler()[0](app=self.app,
 
-                                              schedule_filename='schedule')
 
-         opens = s.persistence.open = Mock()
 
-         s._remove_db = Mock()
 
-         def effect(*args, **kwargs):
 
-             if opens.call_count > 1:
 
-                 return s.sh
 
-             raise OSError()
 
-         opens.side_effect = effect
 
-         s.setup_schedule()
 
-         s._remove_db.assert_called_with()
 
-         s._store = {str('__version__'): 1}
 
-         s.setup_schedule()
 
-         s._store.clear = Mock()
 
-         op = s.persistence.open = Mock()
 
-         op.return_value = s._store
 
-         s._store[str('tz')] = 'FUNKY'
 
-         s.setup_schedule()
 
-         op.assert_called_with(s.schedule_filename, writeback=True)
 
-         s._store.clear.assert_called_with()
 
-         s._store[str('utc_enabled')] = False
 
-         s._store.clear = Mock()
 
-         s.setup_schedule()
 
-         s._store.clear.assert_called_with()
 
-     def test_get_schedule(self):
 
-         s = create_persistent_scheduler()[0](
 
-             schedule_filename='schedule', app=self.app,
 
-         )
 
-         s._store = {str('entries'): {}}
 
-         s.schedule = {'foo': 'bar'}
 
-         assert s.schedule == {'foo': 'bar'}
 
-         assert s._store[str('entries')] == s.schedule
 
-     def test_run_all_due_tasks_after_restart(self):
 
-         scheduler_class, shelve = create_persistent_scheduler_w_call_logging()
 
-         shelve['tz'] = 'UTC'
 
-         shelve['utc_enabled'] = True
 
-         shelve['__version__'] = __version__
 
-         cur_seconds = 20
 
-         def now_func():
 
-             return datetime(2018, 1, 1, 1, 11, cur_seconds)
 
-         app_schedule = {
 
-             'first_missed': {'schedule': crontab(
 
-                 minute='*/10', nowfun=now_func), 'task': 'first_missed'},
 
-             'second_missed': {'schedule': crontab(
 
-                 minute='*/1', nowfun=now_func), 'task': 'second_missed'},
 
-             'non_missed': {'schedule': crontab(
 
-                 minute='*/13', nowfun=now_func), 'task': 'non_missed'}
 
-         }
 
-         shelve['entries'] = {
 
-             'first_missed': beat.ScheduleEntry(
 
-                 'first_missed', 'first_missed',
 
-                 last_run_at=now_func() - timedelta(minutes=2),
 
-                 total_run_count=10,
 
-                 schedule=app_schedule['first_missed']['schedule']),
 
-             'second_missed': beat.ScheduleEntry(
 
-                 'second_missed', 'second_missed',
 
-                 last_run_at=now_func() - timedelta(minutes=2),
 
-                 total_run_count=10,
 
-                 schedule=app_schedule['second_missed']['schedule']),
 
-             'non_missed': beat.ScheduleEntry(
 
-                 'non_missed', 'non_missed',
 
-                 last_run_at=now_func() - timedelta(minutes=2),
 
-                 total_run_count=10,
 
-                 schedule=app_schedule['non_missed']['schedule']),
 
-         }
 
-         self.app.conf.beat_schedule = app_schedule
 
-         scheduler = scheduler_class(self.app)
 
-         max_iter_number = 5
 
-         for i in range(max_iter_number):
 
-             delay = scheduler.tick()
 
-             if delay > 0:
 
-                 break
 
-         assert {'first_missed', 'second_missed'} == {
 
-             item['task'] for item in scheduler.sent}
 
-         # ensure next call on the beginning of next min
 
-         assert abs(60 - cur_seconds - delay) < 1
 
- class test_Service:
 
-     def get_service(self):
 
-         Scheduler, mock_shelve = create_persistent_scheduler()
 
-         return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
 
-     def test_pickleable(self):
 
-         s = beat.Service(app=self.app, scheduler_cls=Mock)
 
-         assert loads(dumps(s))
 
-     def test_start(self):
 
-         s, sh = self.get_service()
 
-         schedule = s.scheduler.schedule
 
-         assert isinstance(schedule, dict)
 
-         assert isinstance(s.scheduler, beat.Scheduler)
 
-         scheduled = list(schedule.keys())
 
-         for task_name in keys(sh[str('entries')]):
 
-             assert task_name in scheduled
 
-         s.sync()
 
-         assert sh.closed
 
-         assert sh.synced
 
-         assert s._is_stopped.isSet()
 
-         s.sync()
 
-         s.stop(wait=False)
 
-         assert s._is_shutdown.isSet()
 
-         s.stop(wait=True)
 
-         assert s._is_shutdown.isSet()
 
-         p = s.scheduler._store
 
-         s.scheduler._store = None
 
-         try:
 
-             s.scheduler.sync()
 
-         finally:
 
-             s.scheduler._store = p
 
-     def test_start_embedded_process(self):
 
-         s, sh = self.get_service()
 
-         s._is_shutdown.set()
 
-         s.start(embedded_process=True)
 
-     def test_start_thread(self):
 
-         s, sh = self.get_service()
 
-         s._is_shutdown.set()
 
-         s.start(embedded_process=False)
 
-     def test_start_tick_raises_exit_error(self):
 
-         s, sh = self.get_service()
 
-         s.scheduler.tick_raises_exit = True
 
-         s.start()
 
-         assert s._is_shutdown.isSet()
 
-     def test_start_manages_one_tick_before_shutdown(self):
 
-         s, sh = self.get_service()
 
-         s.scheduler.shutdown_service = s
 
-         s.start()
 
-         assert s._is_shutdown.isSet()
 
- class test_EmbeddedService:
 
-     @skip.unless_module('_multiprocessing', name='multiprocessing')
 
-     def xxx_start_stop_process(self):
 
-         from billiard.process import Process
 
-         s = beat.EmbeddedService(self.app)
 
-         assert isinstance(s, Process)
 
-         assert isinstance(s.service, beat.Service)
 
-         s.service = MockService()
 
-         class _Popen(object):
 
-             terminated = False
 
-             def terminate(self):
 
-                 self.terminated = True
 
-         with patch('celery.platforms.close_open_fds'):
 
-             s.run()
 
-         assert s.service.started
 
-         s._popen = _Popen()
 
-         s.stop()
 
-         assert s.service.stopped
 
-         assert s._popen.terminated
 
-     def test_start_stop_threaded(self):
 
-         s = beat.EmbeddedService(self.app, thread=True)
 
-         from threading import Thread
 
-         assert isinstance(s, Thread)
 
-         assert isinstance(s.service, beat.Service)
 
-         s.service = MockService()
 
-         s.run()
 
-         assert s.service.started
 
-         s.stop()
 
-         assert s.service.stopped
 
- class test_schedule:
 
-     def test_maybe_make_aware(self):
 
-         x = schedule(10, app=self.app)
 
-         x.utc_enabled = True
 
-         d = x.maybe_make_aware(datetime.utcnow())
 
-         assert d.tzinfo
 
-         x.utc_enabled = False
 
-         d2 = x.maybe_make_aware(datetime.utcnow())
 
-         assert d2.tzinfo
 
-     def test_to_local(self):
 
-         x = schedule(10, app=self.app)
 
-         x.utc_enabled = True
 
-         d = x.to_local(datetime.utcnow())
 
-         assert d.tzinfo is None
 
-         x.utc_enabled = False
 
-         d = x.to_local(datetime.utcnow())
 
-         assert d.tzinfo
 
 
  |