test_beat.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. import errno
  2. import pytest
  3. from datetime import datetime, timedelta
  4. from pickle import dumps, loads
  5. from case import Mock, call, patch, skip
  6. from celery import beat
  7. from celery import uuid
  8. from celery.schedules import schedule
  9. from celery.utils.objects import Bunch
  10. class MockShelve(dict):
  11. closed = False
  12. synced = False
  13. def close(self):
  14. self.closed = True
  15. def sync(self):
  16. self.synced = True
  17. class MockService:
  18. started = False
  19. stopped = False
  20. def __init__(self, *args, **kwargs):
  21. pass
  22. def start(self, **kwargs):
  23. self.started = True
  24. def stop(self, **kwargs):
  25. self.stopped = True
  26. class test_ScheduleEntry:
  27. Entry = beat.ScheduleEntry
  28. def create_entry(self, **kwargs):
  29. entry = dict(
  30. name='celery.unittest.add',
  31. schedule=timedelta(seconds=10),
  32. args=(2, 2),
  33. options={'routing_key': 'cpu'},
  34. app=self.app,
  35. )
  36. return self.Entry(**dict(entry, **kwargs))
  37. def test_next(self):
  38. entry = self.create_entry(schedule=10)
  39. assert entry.last_run_at
  40. assert isinstance(entry.last_run_at, datetime)
  41. assert entry.total_run_count == 0
  42. next_run_at = entry.last_run_at + timedelta(seconds=10)
  43. next_entry = entry.next(next_run_at)
  44. assert next_entry.last_run_at >= next_run_at
  45. assert next_entry.total_run_count == 1
  46. def test_is_due(self):
  47. entry = self.create_entry(schedule=timedelta(seconds=10))
  48. assert entry.app is self.app
  49. assert entry.schedule.app is self.app
  50. due1, next_time_to_run1 = entry.is_due()
  51. assert not due1
  52. assert next_time_to_run1 > 9
  53. next_run_at = entry.last_run_at - timedelta(seconds=10)
  54. next_entry = entry.next(next_run_at)
  55. due2, next_time_to_run2 = next_entry.is_due()
  56. assert due2
  57. assert next_time_to_run2 > 9
  58. def test_repr(self):
  59. entry = self.create_entry()
  60. assert '<ScheduleEntry:' in repr(entry)
  61. def test_reduce(self):
  62. entry = self.create_entry(schedule=timedelta(seconds=10))
  63. fun, args = entry.__reduce__()
  64. res = fun(*args)
  65. assert res.schedule == entry.schedule
  66. def test_lt(self):
  67. e1 = self.create_entry(schedule=timedelta(seconds=10))
  68. e2 = self.create_entry(schedule=timedelta(seconds=2))
  69. # order doesn't matter, see comment in __lt__
  70. res1 = e1 < e2 # noqa
  71. try:
  72. res2 = e1 < object() # noqa
  73. except TypeError:
  74. pass
  75. def test_update(self):
  76. entry = self.create_entry()
  77. assert entry.schedule == timedelta(seconds=10)
  78. assert entry.args == (2, 2)
  79. assert entry.kwargs == {}
  80. assert entry.options == {'routing_key': 'cpu'}
  81. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  82. args=(16, 16),
  83. kwargs={'callback': 'foo.bar.baz'},
  84. options={'routing_key': 'urgent'})
  85. entry.update(entry2)
  86. assert entry.schedule == schedule(timedelta(minutes=20))
  87. assert entry.args == (16, 16)
  88. assert entry.kwargs == {'callback': 'foo.bar.baz'}
  89. assert entry.options == {'routing_key': 'urgent'}
  90. class mScheduler(beat.Scheduler):
  91. def __init__(self, *args, **kwargs):
  92. self.sent = []
  93. beat.Scheduler.__init__(self, *args, **kwargs)
  94. def send_task(self, name=None, args=None, kwargs=None, **options):
  95. self.sent.append({'name': name,
  96. 'args': args,
  97. 'kwargs': kwargs,
  98. 'options': options})
  99. return self.app.AsyncResult(uuid())
  100. class mSchedulerSchedulingError(mScheduler):
  101. def send_task(self, *args, **kwargs):
  102. raise beat.SchedulingError('Could not apply task')
  103. class mSchedulerRuntimeError(mScheduler):
  104. def is_due(self, *args, **kwargs):
  105. raise RuntimeError('dict modified while itervalues')
  106. class mocked_schedule(schedule):
  107. def __init__(self, is_due, next_run_at):
  108. self._is_due = is_due
  109. self._next_run_at = next_run_at
  110. self.run_every = timedelta(seconds=1)
  111. self.nowfun = datetime.utcnow
  112. def is_due(self, last_run_at):
  113. return self._is_due, self._next_run_at
  114. always_due = mocked_schedule(True, 1)
  115. always_pending = mocked_schedule(False, 1)
  116. class test_Scheduler:
  117. def test_custom_schedule_dict(self):
  118. custom = {'foo': 'bar'}
  119. scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
  120. assert scheduler.data is custom
  121. def test_apply_async_uses_registered_task_instances(self):
  122. @self.app.task(shared=False)
  123. def foo():
  124. pass
  125. foo.apply_async = Mock(name='foo.apply_async')
  126. assert foo.name in foo._get_app().tasks
  127. scheduler = mScheduler(app=self.app)
  128. scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
  129. foo.apply_async.assert_called()
  130. def test_should_sync(self):
  131. @self.app.task(shared=False)
  132. def not_sync():
  133. pass
  134. not_sync.apply_async = Mock()
  135. s = mScheduler(app=self.app)
  136. s._do_sync = Mock()
  137. s.should_sync = Mock()
  138. s.should_sync.return_value = True
  139. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  140. s._do_sync.assert_called_with()
  141. s._do_sync = Mock()
  142. s.should_sync.return_value = False
  143. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  144. s._do_sync.assert_not_called()
  145. def test_should_sync_increments_sync_every_counter(self):
  146. self.app.conf.beat_sync_every = 2
  147. @self.app.task(shared=False)
  148. def not_sync():
  149. pass
  150. not_sync.apply_async = Mock()
  151. s = mScheduler(app=self.app)
  152. assert s.sync_every_tasks == 2
  153. s._do_sync = Mock()
  154. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  155. assert s._tasks_since_sync == 1
  156. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  157. s._do_sync.assert_called_with()
  158. self.app.conf.beat_sync_every = 0
  159. def test_sync_task_counter_resets_on_do_sync(self):
  160. self.app.conf.beat_sync_every = 1
  161. @self.app.task(shared=False)
  162. def not_sync():
  163. pass
  164. not_sync.apply_async = Mock()
  165. s = mScheduler(app=self.app)
  166. assert s.sync_every_tasks == 1
  167. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  168. assert s._tasks_since_sync == 0
  169. self.app.conf.beat_sync_every = 0
  170. @patch('celery.app.base.Celery.send_task')
  171. def test_send_task(self, send_task):
  172. b = beat.Scheduler(app=self.app)
  173. b.send_task('tasks.add', countdown=10)
  174. send_task.assert_called_with('tasks.add', countdown=10)
  175. def test_info(self):
  176. scheduler = mScheduler(app=self.app)
  177. assert isinstance(scheduler.info, str)
  178. def test_maybe_entry(self):
  179. s = mScheduler(app=self.app)
  180. entry = s.Entry(name='add every', task='tasks.add', app=self.app)
  181. assert s._maybe_entry(entry.name, entry) is entry
  182. assert s._maybe_entry('add every', {'task': 'tasks.add'})
  183. def test_set_schedule(self):
  184. s = mScheduler(app=self.app)
  185. s.schedule = {'foo': 'bar'}
  186. assert s.data == {'foo': 'bar'}
  187. @patch('kombu.connection.Connection.ensure_connection')
  188. def test_ensure_connection_error_handler(self, ensure):
  189. s = mScheduler(app=self.app)
  190. assert s._ensure_connected()
  191. ensure.assert_called()
  192. callback = ensure.call_args[0][0]
  193. callback(KeyError(), 5)
  194. def test_install_default_entries(self):
  195. self.app.conf.result_expires = None
  196. self.app.conf.beat_schedule = {}
  197. s = mScheduler(app=self.app)
  198. s.install_default_entries({})
  199. assert 'celery.backend_cleanup' not in s.data
  200. self.app.backend.supports_autoexpire = False
  201. self.app.conf.result_expires = 30
  202. s = mScheduler(app=self.app)
  203. s.install_default_entries({})
  204. assert 'celery.backend_cleanup' in s.data
  205. self.app.backend.supports_autoexpire = True
  206. self.app.conf.result_expires = 31
  207. s = mScheduler(app=self.app)
  208. s.install_default_entries({})
  209. assert 'celery.backend_cleanup' not in s.data
  210. def test_due_tick(self):
  211. scheduler = mScheduler(app=self.app)
  212. scheduler.add(name='test_due_tick',
  213. schedule=always_due,
  214. args=(1, 2),
  215. kwargs={'foo': 'bar'})
  216. assert scheduler.tick() == 0
  217. @patch('celery.beat.error')
  218. def test_due_tick_SchedulingError(self, error):
  219. scheduler = mSchedulerSchedulingError(app=self.app)
  220. scheduler.add(name='test_due_tick_SchedulingError',
  221. schedule=always_due)
  222. assert scheduler.tick() == 0
  223. error.assert_called()
  224. def test_pending_tick(self):
  225. scheduler = mScheduler(app=self.app)
  226. scheduler.add(name='test_pending_tick',
  227. schedule=always_pending)
  228. assert scheduler.tick() == 1 - 0.010
  229. def test_honors_max_interval(self):
  230. scheduler = mScheduler(app=self.app)
  231. maxi = scheduler.max_interval
  232. scheduler.add(name='test_honors_max_interval',
  233. schedule=mocked_schedule(False, maxi * 4))
  234. assert scheduler.tick() == maxi
  235. def test_ticks(self):
  236. scheduler = mScheduler(app=self.app)
  237. nums = [600, 300, 650, 120, 250, 36]
  238. s = dict(('test_ticks%s' % i,
  239. {'schedule': mocked_schedule(False, j)})
  240. for i, j in enumerate(nums))
  241. scheduler.update_from_dict(s)
  242. assert scheduler.tick() == min(nums) - 0.010
  243. def test_schedule_no_remain(self):
  244. scheduler = mScheduler(app=self.app)
  245. scheduler.add(name='test_schedule_no_remain',
  246. schedule=mocked_schedule(False, None))
  247. assert scheduler.tick() == scheduler.max_interval
  248. def test_interface(self):
  249. scheduler = mScheduler(app=self.app)
  250. scheduler.sync()
  251. scheduler.setup_schedule()
  252. scheduler.close()
  253. def test_merge_inplace(self):
  254. a = mScheduler(app=self.app)
  255. b = mScheduler(app=self.app)
  256. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  257. 'bar': {'schedule': mocked_schedule(True, 20)}})
  258. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  259. 'baz': {'schedule': mocked_schedule(True, 10)}})
  260. a.merge_inplace(b.schedule)
  261. assert 'foo' not in a.schedule
  262. assert 'baz' in a.schedule
  263. assert a.schedule['bar'].schedule._next_run_at == 40
  264. def create_persistent_scheduler(shelv=None):
  265. if shelv is None:
  266. shelv = MockShelve()
  267. class MockPersistentScheduler(beat.PersistentScheduler):
  268. sh = shelv
  269. persistence = Bunch(
  270. open=lambda *a, **kw: shelv,
  271. )
  272. tick_raises_exit = False
  273. shutdown_service = None
  274. def tick(self):
  275. if self.tick_raises_exit:
  276. raise SystemExit()
  277. if self.shutdown_service:
  278. self.shutdown_service._is_shutdown.set()
  279. return 0.0
  280. return MockPersistentScheduler, shelv
  281. class test_PersistentScheduler:
  282. @patch('os.remove')
  283. def test_remove_db(self, remove):
  284. s = create_persistent_scheduler()[0](app=self.app,
  285. schedule_filename='schedule')
  286. s._remove_db()
  287. remove.assert_has_calls(
  288. [call('schedule' + suffix) for suffix in s.known_suffixes]
  289. )
  290. err = OSError()
  291. err.errno = errno.ENOENT
  292. remove.side_effect = err
  293. s._remove_db()
  294. err.errno = errno.EPERM
  295. with pytest.raises(OSError):
  296. s._remove_db()
  297. def test_setup_schedule(self):
  298. s = create_persistent_scheduler()[0](app=self.app,
  299. schedule_filename='schedule')
  300. opens = s.persistence.open = Mock()
  301. s._remove_db = Mock()
  302. def effect(*args, **kwargs):
  303. if opens.call_count > 1:
  304. return s.sh
  305. raise OSError()
  306. opens.side_effect = effect
  307. s.setup_schedule()
  308. s._remove_db.assert_called_with()
  309. s._store = {str(b'__version__'): 1}
  310. s.setup_schedule()
  311. s._store.clear = Mock()
  312. op = s.persistence.open = Mock()
  313. op.return_value = s._store
  314. s._store[str(b'tz')] = 'FUNKY'
  315. s.setup_schedule()
  316. op.assert_called_with(s.schedule_filename, writeback=True)
  317. s._store.clear.assert_called_with()
  318. s._store[str(b'utc_enabled')] = False
  319. s._store.clear = Mock()
  320. s.setup_schedule()
  321. s._store.clear.assert_called_with()
  322. def test_get_schedule(self):
  323. s = create_persistent_scheduler()[0](
  324. schedule_filename='schedule', app=self.app,
  325. )
  326. s._store = {str(b'entries'): {}}
  327. s.schedule = {'foo': 'bar'}
  328. assert s.schedule == {'foo': 'bar'}
  329. assert s._store[str(b'entries')] == s.schedule
  330. class test_Service:
  331. def get_service(self):
  332. Scheduler, mock_shelve = create_persistent_scheduler()
  333. return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
  334. def test_pickleable(self):
  335. s = beat.Service(app=self.app, scheduler_cls=Mock)
  336. assert loads(dumps(s))
  337. def test_start(self):
  338. s, sh = self.get_service()
  339. schedule = s.scheduler.schedule
  340. assert isinstance(schedule, dict)
  341. assert isinstance(s.scheduler, beat.Scheduler)
  342. scheduled = list(schedule.keys())
  343. for task_name in sh[str(b'entries')].keys():
  344. assert task_name in scheduled
  345. s.sync()
  346. assert sh.closed
  347. assert sh.synced
  348. assert s._is_stopped.isSet()
  349. s.sync()
  350. s.stop(wait=False)
  351. assert s._is_shutdown.isSet()
  352. s.stop(wait=True)
  353. assert s._is_shutdown.isSet()
  354. p = s.scheduler._store
  355. s.scheduler._store = None
  356. try:
  357. s.scheduler.sync()
  358. finally:
  359. s.scheduler._store = p
  360. def test_start_embedded_process(self):
  361. s, sh = self.get_service()
  362. s._is_shutdown.set()
  363. s.start(embedded_process=True)
  364. def test_start_thread(self):
  365. s, sh = self.get_service()
  366. s._is_shutdown.set()
  367. s.start(embedded_process=False)
  368. def test_start_tick_raises_exit_error(self):
  369. s, sh = self.get_service()
  370. s.scheduler.tick_raises_exit = True
  371. s.start()
  372. assert s._is_shutdown.isSet()
  373. def test_start_manages_one_tick_before_shutdown(self):
  374. s, sh = self.get_service()
  375. s.scheduler.shutdown_service = s
  376. s.start()
  377. assert s._is_shutdown.isSet()
  378. class test_EmbeddedService:
  379. @skip.unless_module('_multiprocessing', name='multiprocessing')
  380. def xxx_start_stop_process(self):
  381. from billiard.process import Process
  382. s = beat.EmbeddedService(self.app)
  383. assert isinstance(s, Process)
  384. assert isinstance(s.service, beat.Service)
  385. s.service = MockService()
  386. class _Popen:
  387. terminated = False
  388. def terminate(self):
  389. self.terminated = True
  390. with patch('celery.platforms.close_open_fds'):
  391. s.run()
  392. assert s.service.started
  393. s._popen = _Popen()
  394. s.stop()
  395. assert s.service.stopped
  396. assert s._popen.terminated
  397. def test_start_stop_threaded(self):
  398. s = beat.EmbeddedService(self.app, thread=True)
  399. from threading import Thread
  400. assert isinstance(s, Thread)
  401. assert isinstance(s.service, beat.Service)
  402. s.service = MockService()
  403. s.run()
  404. assert s.service.started
  405. s.stop()
  406. assert s.service.stopped
  407. class test_schedule:
  408. def test_maybe_make_aware(self):
  409. x = schedule(10, app=self.app)
  410. x.utc_enabled = True
  411. d = x.maybe_make_aware(datetime.utcnow())
  412. assert d.tzinfo
  413. x.utc_enabled = False
  414. d2 = x.maybe_make_aware(datetime.utcnow())
  415. assert d2.tzinfo
  416. def test_to_local(self):
  417. x = schedule(10, app=self.app)
  418. x.utc_enabled = True
  419. d = x.to_local(datetime.utcnow())
  420. assert d.tzinfo is None
  421. x.utc_enabled = False
  422. d = x.to_local(datetime.utcnow())
  423. assert d.tzinfo