test_beat.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. from __future__ import absolute_import, unicode_literals
  2. import errno
  3. import pytest
  4. from datetime import datetime, timedelta
  5. from pickle import dumps, loads
  6. from case import Mock, call, patch, skip
  7. from celery import beat
  8. from celery import uuid
  9. from celery.beat import event_t
  10. from celery.five import keys, string_t
  11. from celery.schedules import schedule, crontab
  12. from celery.utils.objects import Bunch
  13. class MockShelve(dict):
  14. closed = False
  15. synced = False
  16. def close(self):
  17. self.closed = True
  18. def sync(self):
  19. self.synced = True
  20. class MockService(object):
  21. started = False
  22. stopped = False
  23. def __init__(self, *args, **kwargs):
  24. pass
  25. def start(self, **kwargs):
  26. self.started = True
  27. def stop(self, **kwargs):
  28. self.stopped = True
  29. class test_ScheduleEntry:
  30. Entry = beat.ScheduleEntry
  31. def create_entry(self, **kwargs):
  32. entry = {
  33. 'name': 'celery.unittest.add',
  34. 'schedule': timedelta(seconds=10),
  35. 'args': (2, 2),
  36. 'options': {'routing_key': 'cpu'},
  37. 'app': self.app,
  38. }
  39. return self.Entry(**dict(entry, **kwargs))
  40. def test_next(self):
  41. entry = self.create_entry(schedule=10)
  42. assert entry.last_run_at
  43. assert isinstance(entry.last_run_at, datetime)
  44. assert entry.total_run_count == 0
  45. next_run_at = entry.last_run_at + timedelta(seconds=10)
  46. next_entry = entry.next(next_run_at)
  47. assert next_entry.last_run_at >= next_run_at
  48. assert next_entry.total_run_count == 1
  49. def test_is_due(self):
  50. entry = self.create_entry(schedule=timedelta(seconds=10))
  51. assert entry.app is self.app
  52. assert entry.schedule.app is self.app
  53. due1, next_time_to_run1 = entry.is_due()
  54. assert not due1
  55. assert next_time_to_run1 > 9
  56. next_run_at = entry.last_run_at - timedelta(seconds=10)
  57. next_entry = entry.next(next_run_at)
  58. due2, next_time_to_run2 = next_entry.is_due()
  59. assert due2
  60. assert next_time_to_run2 > 9
  61. def test_repr(self):
  62. entry = self.create_entry()
  63. assert '<ScheduleEntry:' in repr(entry)
  64. def test_reduce(self):
  65. entry = self.create_entry(schedule=timedelta(seconds=10))
  66. fun, args = entry.__reduce__()
  67. res = fun(*args)
  68. assert res.schedule == entry.schedule
  69. def test_lt(self):
  70. e1 = self.create_entry(schedule=timedelta(seconds=10))
  71. e2 = self.create_entry(schedule=timedelta(seconds=2))
  72. # order doesn't matter, see comment in __lt__
  73. res1 = e1 < e2 # noqa
  74. try:
  75. res2 = e1 < object() # noqa
  76. except TypeError:
  77. pass
  78. def test_update(self):
  79. entry = self.create_entry()
  80. assert entry.schedule == timedelta(seconds=10)
  81. assert entry.args == (2, 2)
  82. assert entry.kwargs == {}
  83. assert entry.options == {'routing_key': 'cpu'}
  84. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  85. args=(16, 16),
  86. kwargs={'callback': 'foo.bar.baz'},
  87. options={'routing_key': 'urgent'})
  88. entry.update(entry2)
  89. assert entry.schedule == schedule(timedelta(minutes=20))
  90. assert entry.args == (16, 16)
  91. assert entry.kwargs == {'callback': 'foo.bar.baz'}
  92. assert entry.options == {'routing_key': 'urgent'}
  93. class mScheduler(beat.Scheduler):
  94. def __init__(self, *args, **kwargs):
  95. self.sent = []
  96. beat.Scheduler.__init__(self, *args, **kwargs)
  97. def send_task(self, name=None, args=None, kwargs=None, **options):
  98. self.sent.append({'name': name,
  99. 'args': args,
  100. 'kwargs': kwargs,
  101. 'options': options})
  102. return self.app.AsyncResult(uuid())
  103. class mSchedulerSchedulingError(mScheduler):
  104. def send_task(self, *args, **kwargs):
  105. raise beat.SchedulingError('Could not apply task')
  106. class mSchedulerRuntimeError(mScheduler):
  107. def is_due(self, *args, **kwargs):
  108. raise RuntimeError('dict modified while itervalues')
  109. class mocked_schedule(schedule):
  110. def __init__(self, is_due, next_run_at):
  111. self._is_due = is_due
  112. self._next_run_at = next_run_at
  113. self.run_every = timedelta(seconds=1)
  114. self.nowfun = datetime.utcnow
  115. def is_due(self, last_run_at):
  116. return self._is_due, self._next_run_at
  117. always_due = mocked_schedule(True, 1)
  118. always_pending = mocked_schedule(False, 1)
  119. class test_Scheduler:
  120. def test_custom_schedule_dict(self):
  121. custom = {'foo': 'bar'}
  122. scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
  123. assert scheduler.data is custom
  124. def test_apply_async_uses_registered_task_instances(self):
  125. @self.app.task(shared=False)
  126. def foo():
  127. pass
  128. foo.apply_async = Mock(name='foo.apply_async')
  129. assert foo.name in foo._get_app().tasks
  130. scheduler = mScheduler(app=self.app)
  131. scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
  132. foo.apply_async.assert_called()
  133. def test_should_sync(self):
  134. @self.app.task(shared=False)
  135. def not_sync():
  136. pass
  137. not_sync.apply_async = Mock()
  138. s = mScheduler(app=self.app)
  139. s._do_sync = Mock()
  140. s.should_sync = Mock()
  141. s.should_sync.return_value = True
  142. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  143. s._do_sync.assert_called_with()
  144. s._do_sync = Mock()
  145. s.should_sync.return_value = False
  146. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  147. s._do_sync.assert_not_called()
  148. def test_should_sync_increments_sync_every_counter(self):
  149. self.app.conf.beat_sync_every = 2
  150. @self.app.task(shared=False)
  151. def not_sync():
  152. pass
  153. not_sync.apply_async = Mock()
  154. s = mScheduler(app=self.app)
  155. assert s.sync_every_tasks == 2
  156. s._do_sync = Mock()
  157. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  158. assert s._tasks_since_sync == 1
  159. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  160. s._do_sync.assert_called_with()
  161. self.app.conf.beat_sync_every = 0
  162. def test_sync_task_counter_resets_on_do_sync(self):
  163. self.app.conf.beat_sync_every = 1
  164. @self.app.task(shared=False)
  165. def not_sync():
  166. pass
  167. not_sync.apply_async = Mock()
  168. s = mScheduler(app=self.app)
  169. assert s.sync_every_tasks == 1
  170. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  171. assert s._tasks_since_sync == 0
  172. self.app.conf.beat_sync_every = 0
  173. @patch('celery.app.base.Celery.send_task')
  174. def test_send_task(self, send_task):
  175. b = beat.Scheduler(app=self.app)
  176. b.send_task('tasks.add', countdown=10)
  177. send_task.assert_called_with('tasks.add', countdown=10)
  178. def test_info(self):
  179. scheduler = mScheduler(app=self.app)
  180. assert isinstance(scheduler.info, string_t)
  181. def test_maybe_entry(self):
  182. s = mScheduler(app=self.app)
  183. entry = s.Entry(name='add every', task='tasks.add', app=self.app)
  184. assert s._maybe_entry(entry.name, entry) is entry
  185. assert s._maybe_entry('add every', {'task': 'tasks.add'})
  186. def test_set_schedule(self):
  187. s = mScheduler(app=self.app)
  188. s.schedule = {'foo': 'bar'}
  189. assert s.data == {'foo': 'bar'}
  190. @patch('kombu.connection.Connection.ensure_connection')
  191. def test_ensure_connection_error_handler(self, ensure):
  192. s = mScheduler(app=self.app)
  193. assert s._ensure_connected()
  194. ensure.assert_called()
  195. callback = ensure.call_args[0][0]
  196. callback(KeyError(), 5)
  197. def test_install_default_entries(self):
  198. self.app.conf.result_expires = None
  199. self.app.conf.beat_schedule = {}
  200. s = mScheduler(app=self.app)
  201. s.install_default_entries({})
  202. assert 'celery.backend_cleanup' not in s.data
  203. self.app.backend.supports_autoexpire = False
  204. self.app.conf.result_expires = 30
  205. s = mScheduler(app=self.app)
  206. s.install_default_entries({})
  207. assert 'celery.backend_cleanup' in s.data
  208. self.app.backend.supports_autoexpire = True
  209. self.app.conf.result_expires = 31
  210. s = mScheduler(app=self.app)
  211. s.install_default_entries({})
  212. assert 'celery.backend_cleanup' not in s.data
  213. def test_due_tick(self):
  214. scheduler = mScheduler(app=self.app)
  215. scheduler.add(name='test_due_tick',
  216. schedule=always_due,
  217. args=(1, 2),
  218. kwargs={'foo': 'bar'})
  219. assert scheduler.tick() == 0
  220. @patch('celery.beat.error')
  221. def test_due_tick_SchedulingError(self, error):
  222. scheduler = mSchedulerSchedulingError(app=self.app)
  223. scheduler.add(name='test_due_tick_SchedulingError',
  224. schedule=always_due)
  225. assert scheduler.tick() == 0
  226. error.assert_called()
  227. def test_pending_tick(self):
  228. scheduler = mScheduler(app=self.app)
  229. scheduler.add(name='test_pending_tick',
  230. schedule=always_pending)
  231. assert scheduler.tick() == 1 - 0.010
  232. def test_honors_max_interval(self):
  233. scheduler = mScheduler(app=self.app)
  234. maxi = scheduler.max_interval
  235. scheduler.add(name='test_honors_max_interval',
  236. schedule=mocked_schedule(False, maxi * 4))
  237. assert scheduler.tick() == maxi
  238. def test_ticks(self):
  239. scheduler = mScheduler(app=self.app)
  240. nums = [600, 300, 650, 120, 250, 36]
  241. s = {'test_ticks%s' % i: {'schedule': mocked_schedule(False, j)}
  242. for i, j in enumerate(nums)}
  243. scheduler.update_from_dict(s)
  244. assert scheduler.tick() == min(nums) - 0.010
  245. def test_ticks_schedule_change(self):
  246. # initialise schedule and check heap is not initialized
  247. scheduler = mScheduler(app=self.app)
  248. assert scheduler._heap is None
  249. # set initial schedule and check heap is updated
  250. schedule_5 = schedule(5)
  251. scheduler.add(name='test_schedule', schedule=schedule_5)
  252. scheduler.tick()
  253. assert scheduler._heap[0].entry.schedule == schedule_5
  254. # update schedule and check heap is updated
  255. schedule_10 = schedule(10)
  256. scheduler.add(name='test_schedule', schedule=schedule(10))
  257. scheduler.tick()
  258. assert scheduler._heap[0].entry.schedule == schedule_10
  259. def test_schedule_no_remain(self):
  260. scheduler = mScheduler(app=self.app)
  261. scheduler.add(name='test_schedule_no_remain',
  262. schedule=mocked_schedule(False, None))
  263. assert scheduler.tick() == scheduler.max_interval
  264. def test_interface(self):
  265. scheduler = mScheduler(app=self.app)
  266. scheduler.sync()
  267. scheduler.setup_schedule()
  268. scheduler.close()
  269. def test_merge_inplace(self):
  270. a = mScheduler(app=self.app)
  271. b = mScheduler(app=self.app)
  272. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  273. 'bar': {'schedule': mocked_schedule(True, 20)}})
  274. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  275. 'baz': {'schedule': mocked_schedule(True, 10)}})
  276. a.merge_inplace(b.schedule)
  277. assert 'foo' not in a.schedule
  278. assert 'baz' in a.schedule
  279. assert a.schedule['bar'].schedule._next_run_at == 40
  280. @patch('celery.beat.Scheduler._when', return_value=1)
  281. def test_populate_heap(self, _when):
  282. scheduler = mScheduler(app=self.app)
  283. scheduler.update_from_dict(
  284. {'foo': {'schedule': mocked_schedule(True, 10)}}
  285. )
  286. scheduler.populate_heap()
  287. assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]
  288. def create_schedule_entry(self, schedule):
  289. entry = {
  290. 'name': 'celery.unittest.add',
  291. 'schedule': schedule,
  292. 'app': self.app,
  293. }
  294. return beat.ScheduleEntry(**dict(entry))
  295. def test_schedule_equal_schedule_vs_schedule_success(self):
  296. scheduler = beat.Scheduler(app=self.app)
  297. a = {'a': self.create_schedule_entry(schedule(5))}
  298. b = {'a': self.create_schedule_entry(schedule(5))}
  299. assert scheduler.schedules_equal(a, b)
  300. def test_schedule_equal_schedule_vs_schedule_fail(self):
  301. scheduler = beat.Scheduler(app=self.app)
  302. a = {'a': self.create_schedule_entry(schedule(5))}
  303. b = {'a': self.create_schedule_entry(schedule(10))}
  304. assert not scheduler.schedules_equal(a, b)
  305. def test_schedule_equal_crontab_vs_crontab_success(self):
  306. scheduler = beat.Scheduler(app=self.app)
  307. a = {'a': self.create_schedule_entry(crontab(minute=5))}
  308. b = {'a': self.create_schedule_entry(crontab(minute=5))}
  309. assert scheduler.schedules_equal(a, b)
  310. def test_schedule_equal_crontab_vs_crontab_fail(self):
  311. scheduler = beat.Scheduler(app=self.app)
  312. a = {'a': self.create_schedule_entry(crontab(minute=5))}
  313. b = {'a': self.create_schedule_entry(crontab(minute=10))}
  314. assert not scheduler.schedules_equal(a, b)
  315. def test_schedule_equal_crontab_vs_schedule_fail(self):
  316. scheduler = beat.Scheduler(app=self.app)
  317. a = {'a': self.create_schedule_entry(crontab(minute=5))}
  318. b = {'a': self.create_schedule_entry(schedule(5))}
  319. assert not scheduler.schedules_equal(a, b)
  320. def test_schedule_equal_different_key_fail(self):
  321. scheduler = beat.Scheduler(app=self.app)
  322. a = {'a': self.create_schedule_entry(schedule(5))}
  323. b = {'b': self.create_schedule_entry(schedule(5))}
  324. assert not scheduler.schedules_equal(a, b)
  325. def create_persistent_scheduler(shelv=None):
  326. if shelv is None:
  327. shelv = MockShelve()
  328. class MockPersistentScheduler(beat.PersistentScheduler):
  329. sh = shelv
  330. persistence = Bunch(
  331. open=lambda *a, **kw: shelv,
  332. )
  333. tick_raises_exit = False
  334. shutdown_service = None
  335. def tick(self):
  336. if self.tick_raises_exit:
  337. raise SystemExit()
  338. if self.shutdown_service:
  339. self.shutdown_service._is_shutdown.set()
  340. return 0.0
  341. return MockPersistentScheduler, shelv
  342. class test_PersistentScheduler:
  343. @patch('os.remove')
  344. def test_remove_db(self, remove):
  345. s = create_persistent_scheduler()[0](app=self.app,
  346. schedule_filename='schedule')
  347. s._remove_db()
  348. remove.assert_has_calls(
  349. [call('schedule' + suffix) for suffix in s.known_suffixes]
  350. )
  351. err = OSError()
  352. err.errno = errno.ENOENT
  353. remove.side_effect = err
  354. s._remove_db()
  355. err.errno = errno.EPERM
  356. with pytest.raises(OSError):
  357. s._remove_db()
  358. def test_setup_schedule(self):
  359. s = create_persistent_scheduler()[0](app=self.app,
  360. schedule_filename='schedule')
  361. opens = s.persistence.open = Mock()
  362. s._remove_db = Mock()
  363. def effect(*args, **kwargs):
  364. if opens.call_count > 1:
  365. return s.sh
  366. raise OSError()
  367. opens.side_effect = effect
  368. s.setup_schedule()
  369. s._remove_db.assert_called_with()
  370. s._store = {str('__version__'): 1}
  371. s.setup_schedule()
  372. s._store.clear = Mock()
  373. op = s.persistence.open = Mock()
  374. op.return_value = s._store
  375. s._store[str('tz')] = 'FUNKY'
  376. s.setup_schedule()
  377. op.assert_called_with(s.schedule_filename, writeback=True)
  378. s._store.clear.assert_called_with()
  379. s._store[str('utc_enabled')] = False
  380. s._store.clear = Mock()
  381. s.setup_schedule()
  382. s._store.clear.assert_called_with()
  383. def test_get_schedule(self):
  384. s = create_persistent_scheduler()[0](
  385. schedule_filename='schedule', app=self.app,
  386. )
  387. s._store = {str('entries'): {}}
  388. s.schedule = {'foo': 'bar'}
  389. assert s.schedule == {'foo': 'bar'}
  390. assert s._store[str('entries')] == s.schedule
  391. class test_Service:
  392. def get_service(self):
  393. Scheduler, mock_shelve = create_persistent_scheduler()
  394. return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
  395. def test_pickleable(self):
  396. s = beat.Service(app=self.app, scheduler_cls=Mock)
  397. assert loads(dumps(s))
  398. def test_start(self):
  399. s, sh = self.get_service()
  400. schedule = s.scheduler.schedule
  401. assert isinstance(schedule, dict)
  402. assert isinstance(s.scheduler, beat.Scheduler)
  403. scheduled = list(schedule.keys())
  404. for task_name in keys(sh[str('entries')]):
  405. assert task_name in scheduled
  406. s.sync()
  407. assert sh.closed
  408. assert sh.synced
  409. assert s._is_stopped.isSet()
  410. s.sync()
  411. s.stop(wait=False)
  412. assert s._is_shutdown.isSet()
  413. s.stop(wait=True)
  414. assert s._is_shutdown.isSet()
  415. p = s.scheduler._store
  416. s.scheduler._store = None
  417. try:
  418. s.scheduler.sync()
  419. finally:
  420. s.scheduler._store = p
  421. def test_start_embedded_process(self):
  422. s, sh = self.get_service()
  423. s._is_shutdown.set()
  424. s.start(embedded_process=True)
  425. def test_start_thread(self):
  426. s, sh = self.get_service()
  427. s._is_shutdown.set()
  428. s.start(embedded_process=False)
  429. def test_start_tick_raises_exit_error(self):
  430. s, sh = self.get_service()
  431. s.scheduler.tick_raises_exit = True
  432. s.start()
  433. assert s._is_shutdown.isSet()
  434. def test_start_manages_one_tick_before_shutdown(self):
  435. s, sh = self.get_service()
  436. s.scheduler.shutdown_service = s
  437. s.start()
  438. assert s._is_shutdown.isSet()
  439. class test_EmbeddedService:
  440. @skip.unless_module('_multiprocessing', name='multiprocessing')
  441. def xxx_start_stop_process(self):
  442. from billiard.process import Process
  443. s = beat.EmbeddedService(self.app)
  444. assert isinstance(s, Process)
  445. assert isinstance(s.service, beat.Service)
  446. s.service = MockService()
  447. class _Popen(object):
  448. terminated = False
  449. def terminate(self):
  450. self.terminated = True
  451. with patch('celery.platforms.close_open_fds'):
  452. s.run()
  453. assert s.service.started
  454. s._popen = _Popen()
  455. s.stop()
  456. assert s.service.stopped
  457. assert s._popen.terminated
  458. def test_start_stop_threaded(self):
  459. s = beat.EmbeddedService(self.app, thread=True)
  460. from threading import Thread
  461. assert isinstance(s, Thread)
  462. assert isinstance(s.service, beat.Service)
  463. s.service = MockService()
  464. s.run()
  465. assert s.service.started
  466. s.stop()
  467. assert s.service.stopped
  468. class test_schedule:
  469. def test_maybe_make_aware(self):
  470. x = schedule(10, app=self.app)
  471. x.utc_enabled = True
  472. d = x.maybe_make_aware(datetime.utcnow())
  473. assert d.tzinfo
  474. x.utc_enabled = False
  475. d2 = x.maybe_make_aware(datetime.utcnow())
  476. assert d2.tzinfo
  477. def test_to_local(self):
  478. x = schedule(10, app=self.app)
  479. x.utc_enabled = True
  480. d = x.to_local(datetime.utcnow())
  481. assert d.tzinfo is None
  482. x.utc_enabled = False
  483. d = x.to_local(datetime.utcnow())
  484. assert d.tzinfo