123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- from __future__ import absolute_import, unicode_literals
- import errno
- import pytest
- from datetime import datetime, timedelta
- from pickle import dumps, loads
- from case import Mock, call, patch, skip
- from celery import beat
- from celery import uuid
- from celery.beat import event_t
- from celery.five import keys, string_t
- from celery.schedules import schedule, crontab
- from celery.utils.objects import Bunch
- class MockShelve(dict):
- closed = False
- synced = False
- def close(self):
- self.closed = True
- def sync(self):
- self.synced = True
- class MockService(object):
- started = False
- stopped = False
- def __init__(self, *args, **kwargs):
- pass
- def start(self, **kwargs):
- self.started = True
- def stop(self, **kwargs):
- self.stopped = True
- class test_ScheduleEntry:
- Entry = beat.ScheduleEntry
- def create_entry(self, **kwargs):
- entry = dict(
- name='celery.unittest.add',
- schedule=timedelta(seconds=10),
- args=(2, 2),
- options={'routing_key': 'cpu'},
- app=self.app,
- )
- return self.Entry(**dict(entry, **kwargs))
- def test_next(self):
- entry = self.create_entry(schedule=10)
- assert entry.last_run_at
- assert isinstance(entry.last_run_at, datetime)
- assert entry.total_run_count == 0
- next_run_at = entry.last_run_at + timedelta(seconds=10)
- next_entry = entry.next(next_run_at)
- assert next_entry.last_run_at >= next_run_at
- assert next_entry.total_run_count == 1
- def test_is_due(self):
- entry = self.create_entry(schedule=timedelta(seconds=10))
- assert entry.app is self.app
- assert entry.schedule.app is self.app
- due1, next_time_to_run1 = entry.is_due()
- assert not due1
- assert next_time_to_run1 > 9
- next_run_at = entry.last_run_at - timedelta(seconds=10)
- next_entry = entry.next(next_run_at)
- due2, next_time_to_run2 = next_entry.is_due()
- assert due2
- assert next_time_to_run2 > 9
- def test_repr(self):
- entry = self.create_entry()
- assert '<ScheduleEntry:' in repr(entry)
- def test_reduce(self):
- entry = self.create_entry(schedule=timedelta(seconds=10))
- fun, args = entry.__reduce__()
- res = fun(*args)
- assert res.schedule == entry.schedule
- def test_lt(self):
- e1 = self.create_entry(schedule=timedelta(seconds=10))
- e2 = self.create_entry(schedule=timedelta(seconds=2))
- # order doesn't matter, see comment in __lt__
- res1 = e1 < e2 # noqa
- try:
- res2 = e1 < object() # noqa
- except TypeError:
- pass
- def test_update(self):
- entry = self.create_entry()
- assert entry.schedule == timedelta(seconds=10)
- assert entry.args == (2, 2)
- assert entry.kwargs == {}
- assert entry.options == {'routing_key': 'cpu'}
- entry2 = self.create_entry(schedule=timedelta(minutes=20),
- args=(16, 16),
- kwargs={'callback': 'foo.bar.baz'},
- options={'routing_key': 'urgent'})
- entry.update(entry2)
- assert entry.schedule == schedule(timedelta(minutes=20))
- assert entry.args == (16, 16)
- assert entry.kwargs == {'callback': 'foo.bar.baz'}
- assert entry.options == {'routing_key': 'urgent'}
- class mScheduler(beat.Scheduler):
- def __init__(self, *args, **kwargs):
- self.sent = []
- beat.Scheduler.__init__(self, *args, **kwargs)
- def send_task(self, name=None, args=None, kwargs=None, **options):
- self.sent.append({'name': name,
- 'args': args,
- 'kwargs': kwargs,
- 'options': options})
- return self.app.AsyncResult(uuid())
- class mSchedulerSchedulingError(mScheduler):
- def send_task(self, *args, **kwargs):
- raise beat.SchedulingError('Could not apply task')
- class mSchedulerRuntimeError(mScheduler):
- def is_due(self, *args, **kwargs):
- raise RuntimeError('dict modified while itervalues')
- class mocked_schedule(schedule):
- def __init__(self, is_due, next_run_at):
- self._is_due = is_due
- self._next_run_at = next_run_at
- self.run_every = timedelta(seconds=1)
- self.nowfun = datetime.utcnow
- def is_due(self, last_run_at):
- return self._is_due, self._next_run_at
- always_due = mocked_schedule(True, 1)
- always_pending = mocked_schedule(False, 1)
- class test_Scheduler:
- def test_custom_schedule_dict(self):
- custom = {'foo': 'bar'}
- scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
- assert scheduler.data is custom
- def test_apply_async_uses_registered_task_instances(self):
- @self.app.task(shared=False)
- def foo():
- pass
- foo.apply_async = Mock(name='foo.apply_async')
- assert foo.name in foo._get_app().tasks
- scheduler = mScheduler(app=self.app)
- scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
- foo.apply_async.assert_called()
- def test_should_sync(self):
- @self.app.task(shared=False)
- def not_sync():
- pass
- not_sync.apply_async = Mock()
- s = mScheduler(app=self.app)
- s._do_sync = Mock()
- s.should_sync = Mock()
- s.should_sync.return_value = True
- s.apply_async(s.Entry(task=not_sync.name, app=self.app))
- s._do_sync.assert_called_with()
- s._do_sync = Mock()
- s.should_sync.return_value = False
- s.apply_async(s.Entry(task=not_sync.name, app=self.app))
- s._do_sync.assert_not_called()
- def test_should_sync_increments_sync_every_counter(self):
- self.app.conf.beat_sync_every = 2
- @self.app.task(shared=False)
- def not_sync():
- pass
- not_sync.apply_async = Mock()
- s = mScheduler(app=self.app)
- assert s.sync_every_tasks == 2
- s._do_sync = Mock()
- s.apply_async(s.Entry(task=not_sync.name, app=self.app))
- assert s._tasks_since_sync == 1
- s.apply_async(s.Entry(task=not_sync.name, app=self.app))
- s._do_sync.assert_called_with()
- self.app.conf.beat_sync_every = 0
- def test_sync_task_counter_resets_on_do_sync(self):
- self.app.conf.beat_sync_every = 1
- @self.app.task(shared=False)
- def not_sync():
- pass
- not_sync.apply_async = Mock()
- s = mScheduler(app=self.app)
- assert s.sync_every_tasks == 1
- s.apply_async(s.Entry(task=not_sync.name, app=self.app))
- assert s._tasks_since_sync == 0
- self.app.conf.beat_sync_every = 0
- @patch('celery.app.base.Celery.send_task')
- def test_send_task(self, send_task):
- b = beat.Scheduler(app=self.app)
- b.send_task('tasks.add', countdown=10)
- send_task.assert_called_with('tasks.add', countdown=10)
- def test_info(self):
- scheduler = mScheduler(app=self.app)
- assert isinstance(scheduler.info, string_t)
- def test_maybe_entry(self):
- s = mScheduler(app=self.app)
- entry = s.Entry(name='add every', task='tasks.add', app=self.app)
- assert s._maybe_entry(entry.name, entry) is entry
- assert s._maybe_entry('add every', {'task': 'tasks.add'})
- def test_set_schedule(self):
- s = mScheduler(app=self.app)
- s.schedule = {'foo': 'bar'}
- assert s.data == {'foo': 'bar'}
- @patch('kombu.connection.Connection.ensure_connection')
- def test_ensure_connection_error_handler(self, ensure):
- s = mScheduler(app=self.app)
- assert s._ensure_connected()
- ensure.assert_called()
- callback = ensure.call_args[0][0]
- callback(KeyError(), 5)
- def test_install_default_entries(self):
- self.app.conf.result_expires = None
- self.app.conf.beat_schedule = {}
- s = mScheduler(app=self.app)
- s.install_default_entries({})
- assert 'celery.backend_cleanup' not in s.data
- self.app.backend.supports_autoexpire = False
- self.app.conf.result_expires = 30
- s = mScheduler(app=self.app)
- s.install_default_entries({})
- assert 'celery.backend_cleanup' in s.data
- self.app.backend.supports_autoexpire = True
- self.app.conf.result_expires = 31
- s = mScheduler(app=self.app)
- s.install_default_entries({})
- assert 'celery.backend_cleanup' not in s.data
- def test_due_tick(self):
- scheduler = mScheduler(app=self.app)
- scheduler.add(name='test_due_tick',
- schedule=always_due,
- args=(1, 2),
- kwargs={'foo': 'bar'})
- assert scheduler.tick() == 0
- @patch('celery.beat.error')
- def test_due_tick_SchedulingError(self, error):
- scheduler = mSchedulerSchedulingError(app=self.app)
- scheduler.add(name='test_due_tick_SchedulingError',
- schedule=always_due)
- assert scheduler.tick() == 0
- error.assert_called()
- def test_pending_tick(self):
- scheduler = mScheduler(app=self.app)
- scheduler.add(name='test_pending_tick',
- schedule=always_pending)
- assert scheduler.tick() == 1 - 0.010
- def test_honors_max_interval(self):
- scheduler = mScheduler(app=self.app)
- maxi = scheduler.max_interval
- scheduler.add(name='test_honors_max_interval',
- schedule=mocked_schedule(False, maxi * 4))
- assert scheduler.tick() == maxi
- def test_ticks(self):
- scheduler = mScheduler(app=self.app)
- nums = [600, 300, 650, 120, 250, 36]
- s = dict(('test_ticks%s' % i,
- {'schedule': mocked_schedule(False, j)})
- for i, j in enumerate(nums))
- scheduler.update_from_dict(s)
- assert scheduler.tick() == min(nums) - 0.010
- def test_ticks_schedule_change(self):
- # initialise schedule and check heap is not initialized
- scheduler = mScheduler(app=self.app)
- assert scheduler._heap is None
- # set initial schedule and check heap is updated
- schedule_5 = schedule(5)
- scheduler.add(name='test_schedule', schedule=schedule_5)
- scheduler.tick()
- assert scheduler._heap[0].entry.schedule == schedule_5
- # update schedule and check heap is updated
- schedule_10 = schedule(10)
- scheduler.add(name='test_schedule', schedule=schedule(10))
- scheduler.tick()
- assert scheduler._heap[0].entry.schedule == schedule_10
- def test_schedule_no_remain(self):
- scheduler = mScheduler(app=self.app)
- scheduler.add(name='test_schedule_no_remain',
- schedule=mocked_schedule(False, None))
- assert scheduler.tick() == scheduler.max_interval
- def test_interface(self):
- scheduler = mScheduler(app=self.app)
- scheduler.sync()
- scheduler.setup_schedule()
- scheduler.close()
- def test_merge_inplace(self):
- a = mScheduler(app=self.app)
- b = mScheduler(app=self.app)
- a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
- 'bar': {'schedule': mocked_schedule(True, 20)}})
- b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
- 'baz': {'schedule': mocked_schedule(True, 10)}})
- a.merge_inplace(b.schedule)
- assert 'foo' not in a.schedule
- assert 'baz' in a.schedule
- assert a.schedule['bar'].schedule._next_run_at == 40
- @patch('celery.beat.Scheduler._when', return_value=1)
- def test_populate_heap(self, _when):
- scheduler = mScheduler(app=self.app)
- scheduler.update_from_dict(
- {'foo': {'schedule': mocked_schedule(True, 10)}}
- )
- scheduler.populate_heap()
- assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]
- def create_schedule_entry(self, schedule):
- entry = dict(
- name='celery.unittest.add',
- schedule=schedule,
- app=self.app,
- )
- return beat.ScheduleEntry(**dict(entry))
- def test_schedule_equal_schedule_vs_schedule_success(self):
- scheduler = beat.Scheduler(app=self.app)
- a = {'a': self.create_schedule_entry(schedule(5))}
- b = {'a': self.create_schedule_entry(schedule(5))}
- assert scheduler.schedules_equal(a, b)
- def test_schedule_equal_schedule_vs_schedule_fail(self):
- scheduler = beat.Scheduler(app=self.app)
- a = {'a': self.create_schedule_entry(schedule(5))}
- b = {'a': self.create_schedule_entry(schedule(10))}
- assert not scheduler.schedules_equal(a, b)
- def test_schedule_equal_crontab_vs_crontab_success(self):
- scheduler = beat.Scheduler(app=self.app)
- a = {'a': self.create_schedule_entry(crontab(minute=5))}
- b = {'a': self.create_schedule_entry(crontab(minute=5))}
- assert scheduler.schedules_equal(a, b)
- def test_schedule_equal_crontab_vs_crontab_fail(self):
- scheduler = beat.Scheduler(app=self.app)
- a = {'a': self.create_schedule_entry(crontab(minute=5))}
- b = {'a': self.create_schedule_entry(crontab(minute=10))}
- assert not scheduler.schedules_equal(a, b)
- def test_schedule_equal_crontab_vs_schedule_fail(self):
- scheduler = beat.Scheduler(app=self.app)
- a = {'a': self.create_schedule_entry(crontab(minute=5))}
- b = {'a': self.create_schedule_entry(schedule(5))}
- assert not scheduler.schedules_equal(a, b)
- def test_schedule_equal_different_key_fail(self):
- scheduler = beat.Scheduler(app=self.app)
- a = {'a': self.create_schedule_entry(schedule(5))}
- b = {'b': self.create_schedule_entry(schedule(5))}
- assert not scheduler.schedules_equal(a, b)
- def create_persistent_scheduler(shelv=None):
- if shelv is None:
- shelv = MockShelve()
- class MockPersistentScheduler(beat.PersistentScheduler):
- sh = shelv
- persistence = Bunch(
- open=lambda *a, **kw: shelv,
- )
- tick_raises_exit = False
- shutdown_service = None
- def tick(self):
- if self.tick_raises_exit:
- raise SystemExit()
- if self.shutdown_service:
- self.shutdown_service._is_shutdown.set()
- return 0.0
- return MockPersistentScheduler, shelv
- class test_PersistentScheduler:
- @patch('os.remove')
- def test_remove_db(self, remove):
- s = create_persistent_scheduler()[0](app=self.app,
- schedule_filename='schedule')
- s._remove_db()
- remove.assert_has_calls(
- [call('schedule' + suffix) for suffix in s.known_suffixes]
- )
- err = OSError()
- err.errno = errno.ENOENT
- remove.side_effect = err
- s._remove_db()
- err.errno = errno.EPERM
- with pytest.raises(OSError):
- s._remove_db()
- def test_setup_schedule(self):
- s = create_persistent_scheduler()[0](app=self.app,
- schedule_filename='schedule')
- opens = s.persistence.open = Mock()
- s._remove_db = Mock()
- def effect(*args, **kwargs):
- if opens.call_count > 1:
- return s.sh
- raise OSError()
- opens.side_effect = effect
- s.setup_schedule()
- s._remove_db.assert_called_with()
- s._store = {str('__version__'): 1}
- s.setup_schedule()
- s._store.clear = Mock()
- op = s.persistence.open = Mock()
- op.return_value = s._store
- s._store[str('tz')] = 'FUNKY'
- s.setup_schedule()
- op.assert_called_with(s.schedule_filename, writeback=True)
- s._store.clear.assert_called_with()
- s._store[str('utc_enabled')] = False
- s._store.clear = Mock()
- s.setup_schedule()
- s._store.clear.assert_called_with()
- def test_get_schedule(self):
- s = create_persistent_scheduler()[0](
- schedule_filename='schedule', app=self.app,
- )
- s._store = {str('entries'): {}}
- s.schedule = {'foo': 'bar'}
- assert s.schedule == {'foo': 'bar'}
- assert s._store[str('entries')] == s.schedule
- class test_Service:
- def get_service(self):
- Scheduler, mock_shelve = create_persistent_scheduler()
- return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
- def test_pickleable(self):
- s = beat.Service(app=self.app, scheduler_cls=Mock)
- assert loads(dumps(s))
- def test_start(self):
- s, sh = self.get_service()
- schedule = s.scheduler.schedule
- assert isinstance(schedule, dict)
- assert isinstance(s.scheduler, beat.Scheduler)
- scheduled = list(schedule.keys())
- for task_name in keys(sh[str('entries')]):
- assert task_name in scheduled
- s.sync()
- assert sh.closed
- assert sh.synced
- assert s._is_stopped.isSet()
- s.sync()
- s.stop(wait=False)
- assert s._is_shutdown.isSet()
- s.stop(wait=True)
- assert s._is_shutdown.isSet()
- p = s.scheduler._store
- s.scheduler._store = None
- try:
- s.scheduler.sync()
- finally:
- s.scheduler._store = p
- def test_start_embedded_process(self):
- s, sh = self.get_service()
- s._is_shutdown.set()
- s.start(embedded_process=True)
- def test_start_thread(self):
- s, sh = self.get_service()
- s._is_shutdown.set()
- s.start(embedded_process=False)
- def test_start_tick_raises_exit_error(self):
- s, sh = self.get_service()
- s.scheduler.tick_raises_exit = True
- s.start()
- assert s._is_shutdown.isSet()
- def test_start_manages_one_tick_before_shutdown(self):
- s, sh = self.get_service()
- s.scheduler.shutdown_service = s
- s.start()
- assert s._is_shutdown.isSet()
- class test_EmbeddedService:
- @skip.unless_module('_multiprocessing', name='multiprocessing')
- def xxx_start_stop_process(self):
- from billiard.process import Process
- s = beat.EmbeddedService(self.app)
- assert isinstance(s, Process)
- assert isinstance(s.service, beat.Service)
- s.service = MockService()
- class _Popen(object):
- terminated = False
- def terminate(self):
- self.terminated = True
- with patch('celery.platforms.close_open_fds'):
- s.run()
- assert s.service.started
- s._popen = _Popen()
- s.stop()
- assert s.service.stopped
- assert s._popen.terminated
- def test_start_stop_threaded(self):
- s = beat.EmbeddedService(self.app, thread=True)
- from threading import Thread
- assert isinstance(s, Thread)
- assert isinstance(s.service, beat.Service)
- s.service = MockService()
- s.run()
- assert s.service.started
- s.stop()
- assert s.service.stopped
- class test_schedule:
- def test_maybe_make_aware(self):
- x = schedule(10, app=self.app)
- x.utc_enabled = True
- d = x.maybe_make_aware(datetime.utcnow())
- assert d.tzinfo
- x.utc_enabled = False
- d2 = x.maybe_make_aware(datetime.utcnow())
- assert d2.tzinfo
- def test_to_local(self):
- x = schedule(10, app=self.app)
- x.utc_enabled = True
- d = x.to_local(datetime.utcnow())
- assert d.tzinfo is None
- x.utc_enabled = False
- d = x.to_local(datetime.utcnow())
- assert d.tzinfo
|