test_beat.py 17 KB


  1. from __future__ import absolute_import
  2. import errno
  3. from datetime import datetime, timedelta
  4. from pickle import dumps, loads
  5. from celery import beat
  6. from celery.five import keys, string_t
  7. from celery.schedules import schedule
  8. from celery.utils import uuid
  9. from celery.tests.case import AppCase, Mock, SkipTest, call, patch
  10. class Object(object):
  11. pass
  12. class MockShelve(dict):
  13. closed = False
  14. synced = False
  15. def close(self):
  16. self.closed = True
  17. def sync(self):
  18. self.synced = True
  19. class MockService(object):
  20. started = False
  21. stopped = False
  22. def __init__(self, *args, **kwargs):
  23. pass
  24. def start(self, **kwargs):
  25. self.started = True
  26. def stop(self, **kwargs):
  27. self.stopped = True
  28. class test_ScheduleEntry(AppCase):
  29. Entry = beat.ScheduleEntry
  30. def create_entry(self, **kwargs):
  31. entry = dict(
  32. name='celery.unittest.add',
  33. schedule=timedelta(seconds=10),
  34. args=(2, 2),
  35. options={'routing_key': 'cpu'},
  36. app=self.app,
  37. )
  38. return self.Entry(**dict(entry, **kwargs))
  39. def test_next(self):
  40. entry = self.create_entry(schedule=10)
  41. self.assertTrue(entry.last_run_at)
  42. self.assertIsInstance(entry.last_run_at, datetime)
  43. self.assertEqual(entry.total_run_count, 0)
  44. next_run_at = entry.last_run_at + timedelta(seconds=10)
  45. next_entry = entry.next(next_run_at)
  46. self.assertGreaterEqual(next_entry.last_run_at, next_run_at)
  47. self.assertEqual(next_entry.total_run_count, 1)
  48. def test_is_due(self):
  49. entry = self.create_entry(schedule=timedelta(seconds=10))
  50. self.assertIs(entry.app, self.app)
  51. self.assertIs(entry.schedule.app, self.app)
  52. due1, next_time_to_run1 = entry.is_due()
  53. self.assertFalse(due1)
  54. self.assertGreater(next_time_to_run1, 9)
  55. next_run_at = entry.last_run_at - timedelta(seconds=10)
  56. next_entry = entry.next(next_run_at)
  57. due2, next_time_to_run2 = next_entry.is_due()
  58. self.assertTrue(due2)
  59. self.assertGreater(next_time_to_run2, 9)
  60. def test_repr(self):
  61. entry = self.create_entry()
  62. self.assertIn('<Entry:', repr(entry))
  63. def test_update(self):
  64. entry = self.create_entry()
  65. self.assertEqual(entry.schedule, timedelta(seconds=10))
  66. self.assertTupleEqual(entry.args, (2, 2))
  67. self.assertDictEqual(entry.kwargs, {})
  68. self.assertDictEqual(entry.options, {'routing_key': 'cpu'})
  69. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  70. args=(16, 16),
  71. kwargs={'callback': 'foo.bar.baz'},
  72. options={'routing_key': 'urgent'})
  73. entry.update(entry2)
  74. self.assertEqual(entry.schedule, schedule(timedelta(minutes=20)))
  75. self.assertTupleEqual(entry.args, (16, 16))
  76. self.assertDictEqual(entry.kwargs, {'callback': 'foo.bar.baz'})
  77. self.assertDictEqual(entry.options, {'routing_key': 'urgent'})
  78. class mScheduler(beat.Scheduler):
  79. def __init__(self, *args, **kwargs):
  80. self.sent = []
  81. beat.Scheduler.__init__(self, *args, **kwargs)
  82. def send_task(self, name=None, args=None, kwargs=None, **options):
  83. self.sent.append({'name': name,
  84. 'args': args,
  85. 'kwargs': kwargs,
  86. 'options': options})
  87. return self.app.AsyncResult(uuid())
  88. class mSchedulerSchedulingError(mScheduler):
  89. def send_task(self, *args, **kwargs):
  90. raise beat.SchedulingError('Could not apply task')
  91. class mSchedulerRuntimeError(mScheduler):
  92. def is_due(self, *args, **kwargs):
  93. raise RuntimeError('dict modified while itervalues')
  94. class mocked_schedule(schedule):
  95. def __init__(self, is_due, next_run_at):
  96. self._is_due = is_due
  97. self._next_run_at = next_run_at
  98. self.run_every = timedelta(seconds=1)
  99. self.nowfun = datetime.utcnow
  100. def is_due(self, last_run_at):
  101. return self._is_due, self._next_run_at
  102. always_due = mocked_schedule(True, 1)
  103. always_pending = mocked_schedule(False, 1)
  104. class test_Scheduler(AppCase):
  105. def test_custom_schedule_dict(self):
  106. custom = {'foo': 'bar'}
  107. scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
  108. self.assertIs(scheduler.data, custom)
  109. def test_apply_async_uses_registered_task_instances(self):
  110. @self.app.task(shared=False)
  111. def foo():
  112. pass
  113. foo.apply_async = Mock(name='foo.apply_async')
  114. assert foo.name in foo._get_app().tasks
  115. scheduler = mScheduler(app=self.app)
  116. scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
  117. self.assertTrue(foo.apply_async.called)
  118. def test_should_sync(self):
  119. @self.app.task(shared=False)
  120. def not_sync():
  121. pass
  122. not_sync.apply_async = Mock()
  123. s = mScheduler(app=self.app)
  124. s._do_sync = Mock()
  125. s.should_sync = Mock()
  126. s.should_sync.return_value = True
  127. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  128. s._do_sync.assert_called_with()
  129. s._do_sync = Mock()
  130. s.should_sync.return_value = False
  131. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  132. self.assertFalse(s._do_sync.called)
  133. def test_should_sync_increments_sync_every_counter(self):
  134. self.app.conf.CELERYBEAT_SYNC_EVERY = 2
  135. @self.app.task(shared=False)
  136. def not_sync():
  137. pass
  138. not_sync.apply_async = Mock()
  139. s = mScheduler(app=self.app)
  140. self.assertEqual(s.sync_every_tasks, 2)
  141. s._do_sync = Mock()
  142. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  143. self.assertEqual(s._tasks_since_sync, 1)
  144. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  145. s._do_sync.assert_called_with()
  146. self.app.conf.CELERYBEAT_SYNC_EVERY = 0
  147. def test_sync_task_counter_resets_on_do_sync(self):
  148. self.app.conf.CELERYBEAT_SYNC_EVERY = 1
  149. @self.app.task(shared=False)
  150. def not_sync():
  151. pass
  152. not_sync.apply_async = Mock()
  153. s = mScheduler(app=self.app)
  154. self.assertEqual(s.sync_every_tasks, 1)
  155. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  156. self.assertEqual(s._tasks_since_sync, 0)
  157. self.app.conf.CELERYBEAT_SYNC_EVERY = 0
  158. @patch('celery.app.base.Celery.send_task')
  159. def test_send_task(self, send_task):
  160. b = beat.Scheduler(app=self.app)
  161. b.send_task('tasks.add', countdown=10)
  162. send_task.assert_called_with('tasks.add', countdown=10)
  163. def test_info(self):
  164. scheduler = mScheduler(app=self.app)
  165. self.assertIsInstance(scheduler.info, string_t)
  166. def test_maybe_entry(self):
  167. s = mScheduler(app=self.app)
  168. entry = s.Entry(name='add every', task='tasks.add', app=self.app)
  169. self.assertIs(s._maybe_entry(entry.name, entry), entry)
  170. self.assertTrue(s._maybe_entry('add every', {
  171. 'task': 'tasks.add',
  172. }))
  173. def test_set_schedule(self):
  174. s = mScheduler(app=self.app)
  175. s.schedule = {'foo': 'bar'}
  176. self.assertEqual(s.data, {'foo': 'bar'})
  177. @patch('kombu.connection.Connection.ensure_connection')
  178. def test_ensure_connection_error_handler(self, ensure):
  179. s = mScheduler(app=self.app)
  180. self.assertTrue(s._ensure_connected())
  181. self.assertTrue(ensure.called)
  182. callback = ensure.call_args[0][0]
  183. callback(KeyError(), 5)
  184. def test_install_default_entries(self):
  185. self.app.conf.CELERY_TASK_RESULT_EXPIRES = None
  186. self.app.conf.CELERYBEAT_SCHEDULE = {}
  187. s = mScheduler(app=self.app)
  188. s.install_default_entries({})
  189. self.assertNotIn('celery.backend_cleanup', s.data)
  190. self.app.backend.supports_autoexpire = False
  191. self.app.conf.CELERY_TASK_RESULT_EXPIRES = 30
  192. s = mScheduler(app=self.app)
  193. s.install_default_entries({})
  194. self.assertIn('celery.backend_cleanup', s.data)
  195. self.app.backend.supports_autoexpire = True
  196. self.app.conf.CELERY_TASK_RESULT_EXPIRES = 31
  197. s = mScheduler(app=self.app)
  198. s.install_default_entries({})
  199. self.assertNotIn('celery.backend_cleanup', s.data)
  200. def test_due_tick(self):
  201. scheduler = mScheduler(app=self.app)
  202. scheduler.add(name='test_due_tick',
  203. schedule=always_due,
  204. args=(1, 2),
  205. kwargs={'foo': 'bar'})
  206. self.assertEqual(scheduler.tick(), 0)
  207. @patch('celery.beat.error')
  208. def test_due_tick_SchedulingError(self, error):
  209. scheduler = mSchedulerSchedulingError(app=self.app)
  210. scheduler.add(name='test_due_tick_SchedulingError',
  211. schedule=always_due)
  212. self.assertEqual(scheduler.tick(), 0)
  213. self.assertTrue(error.called)
  214. def test_pending_tick(self):
  215. scheduler = mScheduler(app=self.app)
  216. scheduler.add(name='test_pending_tick',
  217. schedule=always_pending)
  218. self.assertEqual(scheduler.tick(), 1 - 0.010)
  219. def test_honors_max_interval(self):
  220. scheduler = mScheduler(app=self.app)
  221. maxi = scheduler.max_interval
  222. scheduler.add(name='test_honors_max_interval',
  223. schedule=mocked_schedule(False, maxi * 4))
  224. self.assertEqual(scheduler.tick(), maxi)
  225. def test_ticks(self):
  226. scheduler = mScheduler(app=self.app)
  227. nums = [600, 300, 650, 120, 250, 36]
  228. s = dict(('test_ticks%s' % i,
  229. {'schedule': mocked_schedule(False, j)})
  230. for i, j in enumerate(nums))
  231. scheduler.update_from_dict(s)
  232. self.assertEqual(scheduler.tick(), min(nums) - 0.010)
  233. def test_schedule_no_remain(self):
  234. scheduler = mScheduler(app=self.app)
  235. scheduler.add(name='test_schedule_no_remain',
  236. schedule=mocked_schedule(False, None))
  237. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  238. def test_interface(self):
  239. scheduler = mScheduler(app=self.app)
  240. scheduler.sync()
  241. scheduler.setup_schedule()
  242. scheduler.close()
  243. def test_merge_inplace(self):
  244. a = mScheduler(app=self.app)
  245. b = mScheduler(app=self.app)
  246. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  247. 'bar': {'schedule': mocked_schedule(True, 20)}})
  248. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  249. 'baz': {'schedule': mocked_schedule(True, 10)}})
  250. a.merge_inplace(b.schedule)
  251. self.assertNotIn('foo', a.schedule)
  252. self.assertIn('baz', a.schedule)
  253. self.assertEqual(a.schedule['bar'].schedule._next_run_at, 40)
  254. def create_persistent_scheduler(shelv=None):
  255. if shelv is None:
  256. shelv = MockShelve()
  257. class MockPersistentScheduler(beat.PersistentScheduler):
  258. sh = shelv
  259. persistence = Object()
  260. persistence.open = lambda *a, **kw: shelv
  261. tick_raises_exit = False
  262. shutdown_service = None
  263. def tick(self):
  264. if self.tick_raises_exit:
  265. raise SystemExit()
  266. if self.shutdown_service:
  267. self.shutdown_service._is_shutdown.set()
  268. return 0.0
  269. return MockPersistentScheduler, shelv
  270. class test_PersistentScheduler(AppCase):
  271. @patch('os.remove')
  272. def test_remove_db(self, remove):
  273. s = create_persistent_scheduler()[0](app=self.app,
  274. schedule_filename='schedule')
  275. s._remove_db()
  276. remove.assert_has_calls(
  277. [call('schedule' + suffix) for suffix in s.known_suffixes]
  278. )
  279. err = OSError()
  280. err.errno = errno.ENOENT
  281. remove.side_effect = err
  282. s._remove_db()
  283. err.errno = errno.EPERM
  284. with self.assertRaises(OSError):
  285. s._remove_db()
  286. def test_setup_schedule(self):
  287. s = create_persistent_scheduler()[0](app=self.app,
  288. schedule_filename='schedule')
  289. opens = s.persistence.open = Mock()
  290. s._remove_db = Mock()
  291. def effect(*args, **kwargs):
  292. if opens.call_count > 1:
  293. return s.sh
  294. raise OSError()
  295. opens.side_effect = effect
  296. s.setup_schedule()
  297. s._remove_db.assert_called_with()
  298. s._store = {'__version__': 1}
  299. s.setup_schedule()
  300. s._store.clear = Mock()
  301. op = s.persistence.open = Mock()
  302. op.return_value = s._store
  303. s._store['tz'] = 'FUNKY'
  304. s.setup_schedule()
  305. op.assert_called_with(s.schedule_filename, writeback=True)
  306. s._store.clear.assert_called_with()
  307. s._store['utc_enabled'] = False
  308. s._store.clear = Mock()
  309. s.setup_schedule()
  310. s._store.clear.assert_called_with()
  311. def test_get_schedule(self):
  312. s = create_persistent_scheduler()[0](
  313. schedule_filename='schedule', app=self.app,
  314. )
  315. s._store = {'entries': {}}
  316. s.schedule = {'foo': 'bar'}
  317. self.assertDictEqual(s.schedule, {'foo': 'bar'})
  318. self.assertDictEqual(s._store['entries'], s.schedule)
  319. class test_Service(AppCase):
  320. def get_service(self):
  321. Scheduler, mock_shelve = create_persistent_scheduler()
  322. return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
  323. def test_pickleable(self):
  324. s = beat.Service(app=self.app, scheduler_cls=Mock)
  325. self.assertTrue(loads(dumps(s)))
  326. def test_start(self):
  327. s, sh = self.get_service()
  328. schedule = s.scheduler.schedule
  329. self.assertIsInstance(schedule, dict)
  330. self.assertIsInstance(s.scheduler, beat.Scheduler)
  331. scheduled = list(schedule.keys())
  332. for task_name in keys(sh['entries']):
  333. self.assertIn(task_name, scheduled)
  334. s.sync()
  335. self.assertTrue(sh.closed)
  336. self.assertTrue(sh.synced)
  337. self.assertTrue(s._is_stopped.isSet())
  338. s.sync()
  339. s.stop(wait=False)
  340. self.assertTrue(s._is_shutdown.isSet())
  341. s.stop(wait=True)
  342. self.assertTrue(s._is_shutdown.isSet())
  343. p = s.scheduler._store
  344. s.scheduler._store = None
  345. try:
  346. s.scheduler.sync()
  347. finally:
  348. s.scheduler._store = p
  349. def test_start_embedded_process(self):
  350. s, sh = self.get_service()
  351. s._is_shutdown.set()
  352. s.start(embedded_process=True)
  353. def test_start_thread(self):
  354. s, sh = self.get_service()
  355. s._is_shutdown.set()
  356. s.start(embedded_process=False)
  357. def test_start_tick_raises_exit_error(self):
  358. s, sh = self.get_service()
  359. s.scheduler.tick_raises_exit = True
  360. s.start()
  361. self.assertTrue(s._is_shutdown.isSet())
  362. def test_start_manages_one_tick_before_shutdown(self):
  363. s, sh = self.get_service()
  364. s.scheduler.shutdown_service = s
  365. s.start()
  366. self.assertTrue(s._is_shutdown.isSet())
  367. class test_EmbeddedService(AppCase):
  368. def test_start_stop_process(self):
  369. try:
  370. import _multiprocessing # noqa
  371. except ImportError:
  372. raise SkipTest('multiprocessing not available')
  373. from billiard.process import Process
  374. s = beat.EmbeddedService(app=self.app)
  375. self.assertIsInstance(s, Process)
  376. self.assertIsInstance(s.service, beat.Service)
  377. s.service = MockService()
  378. class _Popen(object):
  379. terminated = False
  380. def terminate(self):
  381. self.terminated = True
  382. with patch('celery.platforms.close_open_fds'):
  383. s.run()
  384. self.assertTrue(s.service.started)
  385. s._popen = _Popen()
  386. s.stop()
  387. self.assertTrue(s.service.stopped)
  388. self.assertTrue(s._popen.terminated)
  389. def test_start_stop_threaded(self):
  390. s = beat.EmbeddedService(thread=True, app=self.app)
  391. from threading import Thread
  392. self.assertIsInstance(s, Thread)
  393. self.assertIsInstance(s.service, beat.Service)
  394. s.service = MockService()
  395. s.run()
  396. self.assertTrue(s.service.started)
  397. s.stop()
  398. self.assertTrue(s.service.stopped)
  399. class test_schedule(AppCase):
  400. def test_maybe_make_aware(self):
  401. x = schedule(10, app=self.app)
  402. x.utc_enabled = True
  403. d = x.maybe_make_aware(datetime.utcnow())
  404. self.assertTrue(d.tzinfo)
  405. x.utc_enabled = False
  406. d2 = x.maybe_make_aware(datetime.utcnow())
  407. self.assertIsNone(d2.tzinfo)
  408. def test_to_local(self):
  409. x = schedule(10, app=self.app)
  410. x.utc_enabled = True
  411. d = x.to_local(datetime.utcnow())
  412. self.assertIsNone(d.tzinfo)
  413. x.utc_enabled = False
  414. d = x.to_local(datetime.utcnow())
  415. self.assertTrue(d.tzinfo)