test_beat.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. from __future__ import absolute_import, unicode_literals
  2. import errno
  3. from datetime import datetime, timedelta
  4. from pickle import dumps, loads
  5. import pytest
  6. from case import Mock, call, patch, skip
  7. from celery import __version__, beat, uuid
  8. from celery.beat import event_t
  9. from celery.five import keys, string_t
  10. from celery.schedules import crontab, schedule
  11. from celery.utils.objects import Bunch
  12. class MockShelve(dict):
  13. closed = False
  14. synced = False
  15. def close(self):
  16. self.closed = True
  17. def sync(self):
  18. self.synced = True
  19. class MockService(object):
  20. started = False
  21. stopped = False
  22. def __init__(self, *args, **kwargs):
  23. pass
  24. def start(self, **kwargs):
  25. self.started = True
  26. def stop(self, **kwargs):
  27. self.stopped = True
  28. class test_ScheduleEntry:
  29. Entry = beat.ScheduleEntry
  30. def create_entry(self, **kwargs):
  31. entry = {
  32. 'name': 'celery.unittest.add',
  33. 'schedule': timedelta(seconds=10),
  34. 'args': (2, 2),
  35. 'options': {'routing_key': 'cpu'},
  36. 'app': self.app,
  37. }
  38. return self.Entry(**dict(entry, **kwargs))
  39. def test_next(self):
  40. entry = self.create_entry(schedule=10)
  41. assert entry.last_run_at
  42. assert isinstance(entry.last_run_at, datetime)
  43. assert entry.total_run_count == 0
  44. next_run_at = entry.last_run_at + timedelta(seconds=10)
  45. next_entry = entry.next(next_run_at)
  46. assert next_entry.last_run_at >= next_run_at
  47. assert next_entry.total_run_count == 1
  48. def test_is_due(self):
  49. entry = self.create_entry(schedule=timedelta(seconds=10))
  50. assert entry.app is self.app
  51. assert entry.schedule.app is self.app
  52. due1, next_time_to_run1 = entry.is_due()
  53. assert not due1
  54. assert next_time_to_run1 > 9
  55. next_run_at = entry.last_run_at - timedelta(seconds=10)
  56. next_entry = entry.next(next_run_at)
  57. due2, next_time_to_run2 = next_entry.is_due()
  58. assert due2
  59. assert next_time_to_run2 > 9
  60. def test_repr(self):
  61. entry = self.create_entry()
  62. assert '<ScheduleEntry:' in repr(entry)
  63. def test_reduce(self):
  64. entry = self.create_entry(schedule=timedelta(seconds=10))
  65. fun, args = entry.__reduce__()
  66. res = fun(*args)
  67. assert res.schedule == entry.schedule
  68. def test_lt(self):
  69. e1 = self.create_entry(schedule=timedelta(seconds=10))
  70. e2 = self.create_entry(schedule=timedelta(seconds=2))
  71. # order doesn't matter, see comment in __lt__
  72. res1 = e1 < e2 # noqa
  73. try:
  74. res2 = e1 < object() # noqa
  75. except TypeError:
  76. pass
  77. def test_update(self):
  78. entry = self.create_entry()
  79. assert entry.schedule == timedelta(seconds=10)
  80. assert entry.args == (2, 2)
  81. assert entry.kwargs == {}
  82. assert entry.options == {'routing_key': 'cpu'}
  83. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  84. args=(16, 16),
  85. kwargs={'callback': 'foo.bar.baz'},
  86. options={'routing_key': 'urgent'})
  87. entry.update(entry2)
  88. assert entry.schedule == schedule(timedelta(minutes=20))
  89. assert entry.args == (16, 16)
  90. assert entry.kwargs == {'callback': 'foo.bar.baz'}
  91. assert entry.options == {'routing_key': 'urgent'}
  92. class mScheduler(beat.Scheduler):
  93. def __init__(self, *args, **kwargs):
  94. self.sent = []
  95. beat.Scheduler.__init__(self, *args, **kwargs)
  96. def send_task(self, name=None, args=None, kwargs=None, **options):
  97. self.sent.append({'name': name,
  98. 'args': args,
  99. 'kwargs': kwargs,
  100. 'options': options})
  101. return self.app.AsyncResult(uuid())
  102. class mSchedulerSchedulingError(mScheduler):
  103. def send_task(self, *args, **kwargs):
  104. raise beat.SchedulingError('Could not apply task')
  105. class mSchedulerRuntimeError(mScheduler):
  106. def is_due(self, *args, **kwargs):
  107. raise RuntimeError('dict modified while itervalues')
  108. class mocked_schedule(schedule):
  109. def __init__(self, is_due, next_run_at):
  110. self._is_due = is_due
  111. self._next_run_at = next_run_at
  112. self.run_every = timedelta(seconds=1)
  113. self.nowfun = datetime.utcnow
  114. def is_due(self, last_run_at):
  115. return self._is_due, self._next_run_at
  116. always_due = mocked_schedule(True, 1)
  117. always_pending = mocked_schedule(False, 1)
  118. class test_Scheduler:
  119. def test_custom_schedule_dict(self):
  120. custom = {'foo': 'bar'}
  121. scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
  122. assert scheduler.data is custom
  123. def test_apply_async_uses_registered_task_instances(self):
  124. @self.app.task(shared=False)
  125. def foo():
  126. pass
  127. foo.apply_async = Mock(name='foo.apply_async')
  128. assert foo.name in foo._get_app().tasks
  129. scheduler = mScheduler(app=self.app)
  130. scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
  131. foo.apply_async.assert_called()
  132. def test_should_sync(self):
  133. @self.app.task(shared=False)
  134. def not_sync():
  135. pass
  136. not_sync.apply_async = Mock()
  137. s = mScheduler(app=self.app)
  138. s._do_sync = Mock()
  139. s.should_sync = Mock()
  140. s.should_sync.return_value = True
  141. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  142. s._do_sync.assert_called_with()
  143. s._do_sync = Mock()
  144. s.should_sync.return_value = False
  145. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  146. s._do_sync.assert_not_called()
  147. def test_should_sync_increments_sync_every_counter(self):
  148. self.app.conf.beat_sync_every = 2
  149. @self.app.task(shared=False)
  150. def not_sync():
  151. pass
  152. not_sync.apply_async = Mock()
  153. s = mScheduler(app=self.app)
  154. assert s.sync_every_tasks == 2
  155. s._do_sync = Mock()
  156. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  157. assert s._tasks_since_sync == 1
  158. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  159. s._do_sync.assert_called_with()
  160. self.app.conf.beat_sync_every = 0
  161. def test_sync_task_counter_resets_on_do_sync(self):
  162. self.app.conf.beat_sync_every = 1
  163. @self.app.task(shared=False)
  164. def not_sync():
  165. pass
  166. not_sync.apply_async = Mock()
  167. s = mScheduler(app=self.app)
  168. assert s.sync_every_tasks == 1
  169. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  170. assert s._tasks_since_sync == 0
  171. self.app.conf.beat_sync_every = 0
  172. @patch('celery.app.base.Celery.send_task')
  173. def test_send_task(self, send_task):
  174. b = beat.Scheduler(app=self.app)
  175. b.send_task('tasks.add', countdown=10)
  176. send_task.assert_called_with('tasks.add', countdown=10)
  177. def test_info(self):
  178. scheduler = mScheduler(app=self.app)
  179. assert isinstance(scheduler.info, string_t)
  180. def test_maybe_entry(self):
  181. s = mScheduler(app=self.app)
  182. entry = s.Entry(name='add every', task='tasks.add', app=self.app)
  183. assert s._maybe_entry(entry.name, entry) is entry
  184. assert s._maybe_entry('add every', {'task': 'tasks.add'})
  185. def test_set_schedule(self):
  186. s = mScheduler(app=self.app)
  187. s.schedule = {'foo': 'bar'}
  188. assert s.data == {'foo': 'bar'}
  189. @patch('kombu.connection.Connection.ensure_connection')
  190. def test_ensure_connection_error_handler(self, ensure):
  191. s = mScheduler(app=self.app)
  192. assert s._ensure_connected()
  193. ensure.assert_called()
  194. callback = ensure.call_args[0][0]
  195. callback(KeyError(), 5)
  196. def test_install_default_entries(self):
  197. self.app.conf.result_expires = None
  198. self.app.conf.beat_schedule = {}
  199. s = mScheduler(app=self.app)
  200. s.install_default_entries({})
  201. assert 'celery.backend_cleanup' not in s.data
  202. self.app.backend.supports_autoexpire = False
  203. self.app.conf.result_expires = 30
  204. s = mScheduler(app=self.app)
  205. s.install_default_entries({})
  206. assert 'celery.backend_cleanup' in s.data
  207. self.app.backend.supports_autoexpire = True
  208. self.app.conf.result_expires = 31
  209. s = mScheduler(app=self.app)
  210. s.install_default_entries({})
  211. assert 'celery.backend_cleanup' not in s.data
  212. def test_due_tick(self):
  213. scheduler = mScheduler(app=self.app)
  214. scheduler.add(name='test_due_tick',
  215. schedule=always_due,
  216. args=(1, 2),
  217. kwargs={'foo': 'bar'})
  218. assert scheduler.tick() == 0
  219. @patch('celery.beat.error')
  220. def test_due_tick_SchedulingError(self, error):
  221. scheduler = mSchedulerSchedulingError(app=self.app)
  222. scheduler.add(name='test_due_tick_SchedulingError',
  223. schedule=always_due)
  224. assert scheduler.tick() == 0
  225. error.assert_called()
  226. def test_pending_tick(self):
  227. scheduler = mScheduler(app=self.app)
  228. scheduler.add(name='test_pending_tick',
  229. schedule=always_pending)
  230. assert scheduler.tick() == 1 - 0.010
  231. def test_honors_max_interval(self):
  232. scheduler = mScheduler(app=self.app)
  233. maxi = scheduler.max_interval
  234. scheduler.add(name='test_honors_max_interval',
  235. schedule=mocked_schedule(False, maxi * 4))
  236. assert scheduler.tick() == maxi
  237. def test_ticks(self):
  238. scheduler = mScheduler(app=self.app)
  239. nums = [600, 300, 650, 120, 250, 36]
  240. s = {'test_ticks%s' % i: {'schedule': mocked_schedule(False, j)}
  241. for i, j in enumerate(nums)}
  242. scheduler.update_from_dict(s)
  243. assert scheduler.tick() == min(nums) - 0.010
  244. def test_ticks_microseconds(self):
  245. scheduler = mScheduler(app=self.app)
  246. now_ts = 1514797200.2
  247. now = datetime.fromtimestamp(now_ts)
  248. schedule_half = schedule(timedelta(seconds=0.5), nowfun=lambda: now)
  249. scheduler.add(name='half_second_schedule', schedule=schedule_half)
  250. scheduler.tick()
  251. # ensure those 0.2 seconds on now_ts don't get dropped
  252. expected_time = now_ts + 0.5 - 0.010
  253. assert scheduler._heap[0].time == expected_time
  254. def test_ticks_schedule_change(self):
  255. # initialise schedule and check heap is not initialized
  256. scheduler = mScheduler(app=self.app)
  257. assert scheduler._heap is None
  258. # set initial schedule and check heap is updated
  259. schedule_5 = schedule(5)
  260. scheduler.add(name='test_schedule', schedule=schedule_5)
  261. scheduler.tick()
  262. assert scheduler._heap[0].entry.schedule == schedule_5
  263. # update schedule and check heap is updated
  264. schedule_10 = schedule(10)
  265. scheduler.add(name='test_schedule', schedule=schedule(10))
  266. scheduler.tick()
  267. assert scheduler._heap[0].entry.schedule == schedule_10
  268. def test_schedule_no_remain(self):
  269. scheduler = mScheduler(app=self.app)
  270. scheduler.add(name='test_schedule_no_remain',
  271. schedule=mocked_schedule(False, None))
  272. assert scheduler.tick() == scheduler.max_interval
  273. def test_interface(self):
  274. scheduler = mScheduler(app=self.app)
  275. scheduler.sync()
  276. scheduler.setup_schedule()
  277. scheduler.close()
  278. def test_merge_inplace(self):
  279. a = mScheduler(app=self.app)
  280. b = mScheduler(app=self.app)
  281. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  282. 'bar': {'schedule': mocked_schedule(True, 20)}})
  283. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  284. 'baz': {'schedule': mocked_schedule(True, 10)}})
  285. a.merge_inplace(b.schedule)
  286. assert 'foo' not in a.schedule
  287. assert 'baz' in a.schedule
  288. assert a.schedule['bar'].schedule._next_run_at == 40
  289. @patch('celery.beat.Scheduler._when', return_value=1)
  290. def test_populate_heap(self, _when):
  291. scheduler = mScheduler(app=self.app)
  292. scheduler.update_from_dict(
  293. {'foo': {'schedule': mocked_schedule(True, 10)}}
  294. )
  295. scheduler.populate_heap()
  296. assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]
  297. def create_schedule_entry(self, schedule=None, args=(), kwargs={},
  298. options={}, task=None):
  299. entry = {
  300. 'name': 'celery.unittest.add',
  301. 'schedule': schedule,
  302. 'app': self.app,
  303. 'args': args,
  304. 'kwargs': kwargs,
  305. 'options': options,
  306. 'task': task
  307. }
  308. return beat.ScheduleEntry(**dict(entry))
  309. def test_schedule_equal_schedule_vs_schedule_success(self):
  310. scheduler = beat.Scheduler(app=self.app)
  311. a = {'a': self.create_schedule_entry(schedule=schedule(5))}
  312. b = {'a': self.create_schedule_entry(schedule=schedule(5))}
  313. assert scheduler.schedules_equal(a, b)
  314. def test_schedule_equal_schedule_vs_schedule_fail(self):
  315. scheduler = beat.Scheduler(app=self.app)
  316. a = {'a': self.create_schedule_entry(schedule=schedule(5))}
  317. b = {'a': self.create_schedule_entry(schedule=schedule(10))}
  318. assert not scheduler.schedules_equal(a, b)
  319. def test_schedule_equal_crontab_vs_crontab_success(self):
  320. scheduler = beat.Scheduler(app=self.app)
  321. a = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
  322. b = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
  323. assert scheduler.schedules_equal(a, b)
  324. def test_schedule_equal_crontab_vs_crontab_fail(self):
  325. scheduler = beat.Scheduler(app=self.app)
  326. a = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
  327. b = {'a': self.create_schedule_entry(schedule=crontab(minute=10))}
  328. assert not scheduler.schedules_equal(a, b)
  329. def test_schedule_equal_crontab_vs_schedule_fail(self):
  330. scheduler = beat.Scheduler(app=self.app)
  331. a = {'a': self.create_schedule_entry(schedule=crontab(minute=5))}
  332. b = {'a': self.create_schedule_entry(schedule=schedule(5))}
  333. assert not scheduler.schedules_equal(a, b)
  334. def test_schedule_equal_different_key_fail(self):
  335. scheduler = beat.Scheduler(app=self.app)
  336. a = {'a': self.create_schedule_entry(schedule=schedule(5))}
  337. b = {'b': self.create_schedule_entry(schedule=schedule(5))}
  338. assert not scheduler.schedules_equal(a, b)
  339. def test_schedule_equal_args_vs_args_success(self):
  340. scheduler = beat.Scheduler(app=self.app)
  341. a = {'a': self.create_schedule_entry(args='a')}
  342. b = {'a': self.create_schedule_entry(args='a')}
  343. assert scheduler.schedules_equal(a, b)
  344. def test_schedule_equal_args_vs_args_fail(self):
  345. scheduler = beat.Scheduler(app=self.app)
  346. a = {'a': self.create_schedule_entry(args='a')}
  347. b = {'a': self.create_schedule_entry(args='b')}
  348. assert not scheduler.schedules_equal(a, b)
  349. def test_schedule_equal_kwargs_vs_kwargs_success(self):
  350. scheduler = beat.Scheduler(app=self.app)
  351. a = {'a': self.create_schedule_entry(kwargs={'a': 'a'})}
  352. b = {'a': self.create_schedule_entry(kwargs={'a': 'a'})}
  353. assert scheduler.schedules_equal(a, b)
  354. def test_schedule_equal_kwargs_vs_kwargs_fail(self):
  355. scheduler = beat.Scheduler(app=self.app)
  356. a = {'a': self.create_schedule_entry(kwargs={'a': 'a'})}
  357. b = {'a': self.create_schedule_entry(kwargs={'b': 'b'})}
  358. assert not scheduler.schedules_equal(a, b)
  359. def test_schedule_equal_options_vs_options_success(self):
  360. scheduler = beat.Scheduler(app=self.app)
  361. a = {'a': self.create_schedule_entry(options={'a': 'a'})}
  362. b = {'a': self.create_schedule_entry(options={'a': 'a'})}
  363. assert scheduler.schedules_equal(a, b)
  364. def test_schedule_equal_options_vs_options_fail(self):
  365. scheduler = beat.Scheduler(app=self.app)
  366. a = {'a': self.create_schedule_entry(options={'a': 'a'})}
  367. b = {'a': self.create_schedule_entry(options={'b': 'b'})}
  368. assert not scheduler.schedules_equal(a, b)
  369. def test_schedule_equal_task_vs_task_success(self):
  370. scheduler = beat.Scheduler(app=self.app)
  371. a = {'a': self.create_schedule_entry(task='a')}
  372. b = {'a': self.create_schedule_entry(task='a')}
  373. assert scheduler.schedules_equal(a, b)
  374. def test_schedule_equal_task_vs_task_fail(self):
  375. scheduler = beat.Scheduler(app=self.app)
  376. a = {'a': self.create_schedule_entry(task='a')}
  377. b = {'a': self.create_schedule_entry(task='b')}
  378. assert not scheduler.schedules_equal(a, b)
  379. def create_persistent_scheduler(shelv=None):
  380. if shelv is None:
  381. shelv = MockShelve()
  382. class MockPersistentScheduler(beat.PersistentScheduler):
  383. sh = shelv
  384. persistence = Bunch(
  385. open=lambda *a, **kw: shelv,
  386. )
  387. tick_raises_exit = False
  388. shutdown_service = None
  389. def tick(self):
  390. if self.tick_raises_exit:
  391. raise SystemExit()
  392. if self.shutdown_service:
  393. self.shutdown_service._is_shutdown.set()
  394. return 0.0
  395. return MockPersistentScheduler, shelv
  396. def create_persistent_scheduler_w_call_logging(shelv=None):
  397. if shelv is None:
  398. shelv = MockShelve()
  399. class MockPersistentScheduler(beat.PersistentScheduler):
  400. sh = shelv
  401. persistence = Bunch(
  402. open=lambda *a, **kw: shelv,
  403. )
  404. def __init__(self, *args, **kwargs):
  405. self.sent = []
  406. beat.PersistentScheduler.__init__(self, *args, **kwargs)
  407. def send_task(self, task=None, args=None, kwargs=None, **options):
  408. self.sent.append({'task': task,
  409. 'args': args,
  410. 'kwargs': kwargs,
  411. 'options': options})
  412. return self.app.AsyncResult(uuid())
  413. return MockPersistentScheduler, shelv
  414. class test_PersistentScheduler:
  415. @patch('os.remove')
  416. def test_remove_db(self, remove):
  417. s = create_persistent_scheduler()[0](app=self.app,
  418. schedule_filename='schedule')
  419. s._remove_db()
  420. remove.assert_has_calls(
  421. [call('schedule' + suffix) for suffix in s.known_suffixes]
  422. )
  423. err = OSError()
  424. err.errno = errno.ENOENT
  425. remove.side_effect = err
  426. s._remove_db()
  427. err.errno = errno.EPERM
  428. with pytest.raises(OSError):
  429. s._remove_db()
  430. def test_setup_schedule(self):
  431. s = create_persistent_scheduler()[0](app=self.app,
  432. schedule_filename='schedule')
  433. opens = s.persistence.open = Mock()
  434. s._remove_db = Mock()
  435. def effect(*args, **kwargs):
  436. if opens.call_count > 1:
  437. return s.sh
  438. raise OSError()
  439. opens.side_effect = effect
  440. s.setup_schedule()
  441. s._remove_db.assert_called_with()
  442. s._store = {str('__version__'): 1}
  443. s.setup_schedule()
  444. s._store.clear = Mock()
  445. op = s.persistence.open = Mock()
  446. op.return_value = s._store
  447. s._store[str('tz')] = 'FUNKY'
  448. s.setup_schedule()
  449. op.assert_called_with(s.schedule_filename, writeback=True)
  450. s._store.clear.assert_called_with()
  451. s._store[str('utc_enabled')] = False
  452. s._store.clear = Mock()
  453. s.setup_schedule()
  454. s._store.clear.assert_called_with()
  455. def test_get_schedule(self):
  456. s = create_persistent_scheduler()[0](
  457. schedule_filename='schedule', app=self.app,
  458. )
  459. s._store = {str('entries'): {}}
  460. s.schedule = {'foo': 'bar'}
  461. assert s.schedule == {'foo': 'bar'}
  462. assert s._store[str('entries')] == s.schedule
  463. def test_run_all_due_tasks_after_restart(self):
  464. scheduler_class, shelve = create_persistent_scheduler_w_call_logging()
  465. shelve['tz'] = 'UTC'
  466. shelve['utc_enabled'] = True
  467. shelve['__version__'] = __version__
  468. cur_seconds = 20
  469. def now_func():
  470. return datetime(2018, 1, 1, 1, 11, cur_seconds)
  471. app_schedule = {
  472. 'first_missed': {'schedule': crontab(
  473. minute='*/10', nowfun=now_func), 'task': 'first_missed'},
  474. 'second_missed': {'schedule': crontab(
  475. minute='*/1', nowfun=now_func), 'task': 'second_missed'},
  476. 'non_missed': {'schedule': crontab(
  477. minute='*/13', nowfun=now_func), 'task': 'non_missed'}
  478. }
  479. shelve['entries'] = {
  480. 'first_missed': beat.ScheduleEntry(
  481. 'first_missed', 'first_missed',
  482. last_run_at=now_func() - timedelta(minutes=2),
  483. total_run_count=10,
  484. schedule=app_schedule['first_missed']['schedule']),
  485. 'second_missed': beat.ScheduleEntry(
  486. 'second_missed', 'second_missed',
  487. last_run_at=now_func() - timedelta(minutes=2),
  488. total_run_count=10,
  489. schedule=app_schedule['second_missed']['schedule']),
  490. 'non_missed': beat.ScheduleEntry(
  491. 'non_missed', 'non_missed',
  492. last_run_at=now_func() - timedelta(minutes=2),
  493. total_run_count=10,
  494. schedule=app_schedule['non_missed']['schedule']),
  495. }
  496. self.app.conf.beat_schedule = app_schedule
  497. scheduler = scheduler_class(self.app)
  498. max_iter_number = 5
  499. for i in range(max_iter_number):
  500. delay = scheduler.tick()
  501. if delay > 0:
  502. break
  503. assert {'first_missed', 'second_missed'} == {
  504. item['task'] for item in scheduler.sent}
  505. # ensure next call on the beginning of next min
  506. assert abs(60 - cur_seconds - delay) < 1
  507. class test_Service:
  508. def get_service(self):
  509. Scheduler, mock_shelve = create_persistent_scheduler()
  510. return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
  511. def test_pickleable(self):
  512. s = beat.Service(app=self.app, scheduler_cls=Mock)
  513. assert loads(dumps(s))
  514. def test_start(self):
  515. s, sh = self.get_service()
  516. schedule = s.scheduler.schedule
  517. assert isinstance(schedule, dict)
  518. assert isinstance(s.scheduler, beat.Scheduler)
  519. scheduled = list(schedule.keys())
  520. for task_name in keys(sh[str('entries')]):
  521. assert task_name in scheduled
  522. s.sync()
  523. assert sh.closed
  524. assert sh.synced
  525. assert s._is_stopped.isSet()
  526. s.sync()
  527. s.stop(wait=False)
  528. assert s._is_shutdown.isSet()
  529. s.stop(wait=True)
  530. assert s._is_shutdown.isSet()
  531. p = s.scheduler._store
  532. s.scheduler._store = None
  533. try:
  534. s.scheduler.sync()
  535. finally:
  536. s.scheduler._store = p
  537. def test_start_embedded_process(self):
  538. s, sh = self.get_service()
  539. s._is_shutdown.set()
  540. s.start(embedded_process=True)
  541. def test_start_thread(self):
  542. s, sh = self.get_service()
  543. s._is_shutdown.set()
  544. s.start(embedded_process=False)
  545. def test_start_tick_raises_exit_error(self):
  546. s, sh = self.get_service()
  547. s.scheduler.tick_raises_exit = True
  548. s.start()
  549. assert s._is_shutdown.isSet()
  550. def test_start_manages_one_tick_before_shutdown(self):
  551. s, sh = self.get_service()
  552. s.scheduler.shutdown_service = s
  553. s.start()
  554. assert s._is_shutdown.isSet()
  555. class test_EmbeddedService:
  556. @skip.unless_module('_multiprocessing', name='multiprocessing')
  557. def xxx_start_stop_process(self):
  558. from billiard.process import Process
  559. s = beat.EmbeddedService(self.app)
  560. assert isinstance(s, Process)
  561. assert isinstance(s.service, beat.Service)
  562. s.service = MockService()
  563. class _Popen(object):
  564. terminated = False
  565. def terminate(self):
  566. self.terminated = True
  567. with patch('celery.platforms.close_open_fds'):
  568. s.run()
  569. assert s.service.started
  570. s._popen = _Popen()
  571. s.stop()
  572. assert s.service.stopped
  573. assert s._popen.terminated
  574. def test_start_stop_threaded(self):
  575. s = beat.EmbeddedService(self.app, thread=True)
  576. from threading import Thread
  577. assert isinstance(s, Thread)
  578. assert isinstance(s.service, beat.Service)
  579. s.service = MockService()
  580. s.run()
  581. assert s.service.started
  582. s.stop()
  583. assert s.service.stopped
  584. class test_schedule:
  585. def test_maybe_make_aware(self):
  586. x = schedule(10, app=self.app)
  587. x.utc_enabled = True
  588. d = x.maybe_make_aware(datetime.utcnow())
  589. assert d.tzinfo
  590. x.utc_enabled = False
  591. d2 = x.maybe_make_aware(datetime.utcnow())
  592. assert d2.tzinfo
  593. def test_to_local(self):
  594. x = schedule(10, app=self.app)
  595. x.utc_enabled = True
  596. d = x.to_local(datetime.utcnow())
  597. assert d.tzinfo is None
  598. x.utc_enabled = False
  599. d = x.to_local(datetime.utcnow())
  600. assert d.tzinfo