test_beat.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. from __future__ import absolute_import
  2. import errno
  3. from datetime import datetime, timedelta
  4. from mock import Mock, call, patch
  5. from nose import SkipTest
  6. from pickle import dumps, loads
  7. from celery import beat
  8. from celery import task
  9. from celery.five import keys, string_t
  10. from celery.result import AsyncResult
  11. from celery.schedules import schedule
  12. from celery.utils import uuid
  13. from celery.tests.case import AppCase, patch_settings
  14. class Object(object):
  15. pass
  16. class MockShelve(dict):
  17. closed = False
  18. synced = False
  19. def close(self):
  20. self.closed = True
  21. def sync(self):
  22. self.synced = True
  23. class MockService(object):
  24. started = False
  25. stopped = False
  26. def __init__(self, *args, **kwargs):
  27. pass
  28. def start(self, **kwargs):
  29. self.started = True
  30. def stop(self, **kwargs):
  31. self.stopped = True
  32. class test_ScheduleEntry(AppCase):
  33. Entry = beat.ScheduleEntry
  34. def create_entry(self, **kwargs):
  35. entry = dict(name='celery.unittest.add',
  36. schedule=schedule(timedelta(seconds=10)),
  37. args=(2, 2),
  38. options={'routing_key': 'cpu'})
  39. return self.Entry(**dict(entry, **kwargs))
  40. def test_next(self):
  41. entry = self.create_entry(schedule=10)
  42. self.assertTrue(entry.last_run_at)
  43. self.assertIsInstance(entry.last_run_at, datetime)
  44. self.assertEqual(entry.total_run_count, 0)
  45. next_run_at = entry.last_run_at + timedelta(seconds=10)
  46. next_entry = entry.next(next_run_at)
  47. self.assertGreaterEqual(next_entry.last_run_at, next_run_at)
  48. self.assertEqual(next_entry.total_run_count, 1)
  49. def test_is_due(self):
  50. entry = self.create_entry(schedule=timedelta(seconds=10))
  51. due1, next_time_to_run1 = entry.is_due()
  52. self.assertFalse(due1)
  53. self.assertGreater(next_time_to_run1, 9)
  54. next_run_at = entry.last_run_at - timedelta(seconds=10)
  55. next_entry = entry.next(next_run_at)
  56. due2, next_time_to_run2 = next_entry.is_due()
  57. self.assertTrue(due2)
  58. self.assertGreater(next_time_to_run2, 9)
  59. def test_repr(self):
  60. entry = self.create_entry()
  61. self.assertIn('<Entry:', repr(entry))
  62. def test_update(self):
  63. entry = self.create_entry()
  64. self.assertEqual(entry.schedule, timedelta(seconds=10))
  65. self.assertTupleEqual(entry.args, (2, 2))
  66. self.assertDictEqual(entry.kwargs, {})
  67. self.assertDictEqual(entry.options, {'routing_key': 'cpu'})
  68. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  69. args=(16, 16),
  70. kwargs={'callback': 'foo.bar.baz'},
  71. options={'routing_key': 'urgent'})
  72. entry.update(entry2)
  73. self.assertEqual(entry.schedule, schedule(timedelta(minutes=20)))
  74. self.assertTupleEqual(entry.args, (16, 16))
  75. self.assertDictEqual(entry.kwargs, {'callback': 'foo.bar.baz'})
  76. self.assertDictEqual(entry.options, {'routing_key': 'urgent'})
  77. class mScheduler(beat.Scheduler):
  78. def __init__(self, *args, **kwargs):
  79. self.sent = []
  80. beat.Scheduler.__init__(self, *args, **kwargs)
  81. def send_task(self, name=None, args=None, kwargs=None, **options):
  82. self.sent.append({'name': name,
  83. 'args': args,
  84. 'kwargs': kwargs,
  85. 'options': options})
  86. return AsyncResult(uuid(), app=self.app)
  87. class mSchedulerSchedulingError(mScheduler):
  88. def send_task(self, *args, **kwargs):
  89. raise beat.SchedulingError('Could not apply task')
  90. class mSchedulerRuntimeError(mScheduler):
  91. def maybe_due(self, *args, **kwargs):
  92. raise RuntimeError('dict modified while itervalues')
  93. class mocked_schedule(schedule):
  94. def __init__(self, is_due, next_run_at):
  95. self._is_due = is_due
  96. self._next_run_at = next_run_at
  97. self.run_every = timedelta(seconds=1)
  98. self.nowfun = datetime.utcnow
  99. def is_due(self, last_run_at):
  100. return self._is_due, self._next_run_at
  101. always_due = mocked_schedule(True, 1)
  102. always_pending = mocked_schedule(False, 1)
  103. class test_Scheduler(AppCase):
  104. def test_custom_schedule_dict(self):
  105. custom = {'foo': 'bar'}
  106. scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
  107. self.assertIs(scheduler.data, custom)
  108. def test_apply_async_uses_registered_task_instances(self):
  109. @self.app.task
  110. def foo():
  111. pass
  112. foo.apply_async = Mock(name='foo.apply_async')
  113. assert foo.name in foo._get_app().tasks
  114. scheduler = mScheduler(app=self.app)
  115. scheduler.apply_async(scheduler.Entry(task=foo.name))
  116. self.assertTrue(foo.apply_async.called)
  117. def test_apply_async_should_not_sync(self):
  118. @task()
  119. def not_sync():
  120. pass
  121. not_sync.apply_async = Mock()
  122. s = mScheduler(app=self.app)
  123. s._do_sync = Mock()
  124. s.should_sync = Mock()
  125. s.should_sync.return_value = True
  126. s.apply_async(s.Entry(task=not_sync.name))
  127. s._do_sync.assert_called_with()
  128. s._do_sync = Mock()
  129. s.should_sync.return_value = False
  130. s.apply_async(s.Entry(task=not_sync.name))
  131. self.assertFalse(s._do_sync.called)
  132. @patch('celery.app.base.Celery.send_task')
  133. def test_send_task(self, send_task):
  134. b = beat.Scheduler(app=self.app)
  135. b.send_task('tasks.add', countdown=10)
  136. send_task.assert_called_with('tasks.add', countdown=10)
  137. def test_info(self):
  138. scheduler = mScheduler(app=self.app)
  139. self.assertIsInstance(scheduler.info, string_t)
  140. def test_maybe_entry(self):
  141. s = mScheduler(app=self.app)
  142. entry = s.Entry(name='add every', task='tasks.add')
  143. self.assertIs(s._maybe_entry(entry.name, entry), entry)
  144. self.assertTrue(s._maybe_entry('add every', {
  145. 'task': 'tasks.add',
  146. }))
  147. def test_set_schedule(self):
  148. s = mScheduler(app=self.app)
  149. s.schedule = {'foo': 'bar'}
  150. self.assertEqual(s.data, {'foo': 'bar'})
  151. @patch('kombu.connection.Connection.ensure_connection')
  152. def test_ensure_connection_error_handler(self, ensure):
  153. s = mScheduler(app=self.app)
  154. self.assertTrue(s._ensure_connected())
  155. self.assertTrue(ensure.called)
  156. callback = ensure.call_args[0][0]
  157. callback(KeyError(), 5)
  158. def test_install_default_entries(self):
  159. with patch_settings(self.app,
  160. CELERY_TASK_RESULT_EXPIRES=None,
  161. CELERYBEAT_SCHEDULE={}):
  162. s = mScheduler(app=self.app)
  163. s.install_default_entries({})
  164. self.assertNotIn('celery.backend_cleanup', s.data)
  165. self.app.backend.supports_autoexpire = False
  166. with patch_settings(self.app,
  167. CELERY_TASK_RESULT_EXPIRES=30,
  168. CELERYBEAT_SCHEDULE={}):
  169. s = mScheduler(app=self.app)
  170. s.install_default_entries({})
  171. self.assertIn('celery.backend_cleanup', s.data)
  172. self.app.backend.supports_autoexpire = True
  173. try:
  174. with patch_settings(self.app,
  175. CELERY_TASK_RESULT_EXPIRES=31,
  176. CELERYBEAT_SCHEDULE={}):
  177. s = mScheduler(app=self.app)
  178. s.install_default_entries({})
  179. self.assertNotIn('celery.backend_cleanup', s.data)
  180. finally:
  181. self.app.backend.supports_autoexpire = False
  182. def test_due_tick(self):
  183. scheduler = mScheduler(app=self.app)
  184. scheduler.add(name='test_due_tick',
  185. schedule=always_due,
  186. args=(1, 2),
  187. kwargs={'foo': 'bar'})
  188. self.assertEqual(scheduler.tick(), 1)
  189. @patch('celery.beat.error')
  190. def test_due_tick_SchedulingError(self, error):
  191. scheduler = mSchedulerSchedulingError(app=self.app)
  192. scheduler.add(name='test_due_tick_SchedulingError',
  193. schedule=always_due)
  194. self.assertEqual(scheduler.tick(), 1)
  195. self.assertTrue(error.called)
  196. def test_due_tick_RuntimeError(self):
  197. scheduler = mSchedulerRuntimeError(app=self.app)
  198. scheduler.add(name='test_due_tick_RuntimeError',
  199. schedule=always_due)
  200. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  201. def test_pending_tick(self):
  202. scheduler = mScheduler(app=self.app)
  203. scheduler.add(name='test_pending_tick',
  204. schedule=always_pending)
  205. self.assertEqual(scheduler.tick(), 1)
  206. def test_honors_max_interval(self):
  207. scheduler = mScheduler(app=self.app)
  208. maxi = scheduler.max_interval
  209. scheduler.add(name='test_honors_max_interval',
  210. schedule=mocked_schedule(False, maxi * 4))
  211. self.assertEqual(scheduler.tick(), maxi)
  212. def test_ticks(self):
  213. scheduler = mScheduler(app=self.app)
  214. nums = [600, 300, 650, 120, 250, 36]
  215. s = dict(('test_ticks%s' % i,
  216. {'schedule': mocked_schedule(False, j)})
  217. for i, j in enumerate(nums))
  218. scheduler.update_from_dict(s)
  219. self.assertEqual(scheduler.tick(), min(nums))
  220. def test_schedule_no_remain(self):
  221. scheduler = mScheduler(app=self.app)
  222. scheduler.add(name='test_schedule_no_remain',
  223. schedule=mocked_schedule(False, None))
  224. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  225. def test_interface(self):
  226. scheduler = mScheduler(app=self.app)
  227. scheduler.sync()
  228. scheduler.setup_schedule()
  229. scheduler.close()
  230. def test_merge_inplace(self):
  231. a = mScheduler(app=self.app)
  232. b = mScheduler(app=self.app)
  233. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  234. 'bar': {'schedule': mocked_schedule(True, 20)}})
  235. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  236. 'baz': {'schedule': mocked_schedule(True, 10)}})
  237. a.merge_inplace(b.schedule)
  238. self.assertNotIn('foo', a.schedule)
  239. self.assertIn('baz', a.schedule)
  240. self.assertEqual(a.schedule['bar'].schedule._next_run_at, 40)
  241. def create_persistent_scheduler(shelv=None):
  242. if shelv is None:
  243. shelv = MockShelve()
  244. class MockPersistentScheduler(beat.PersistentScheduler):
  245. sh = shelv
  246. persistence = Object()
  247. persistence.open = lambda *a, **kw: shelv
  248. tick_raises_exit = False
  249. shutdown_service = None
  250. def tick(self):
  251. if self.tick_raises_exit:
  252. raise SystemExit()
  253. if self.shutdown_service:
  254. self.shutdown_service._is_shutdown.set()
  255. return 0.0
  256. return MockPersistentScheduler, shelv
  257. class test_PersistentScheduler(AppCase):
  258. @patch('os.remove')
  259. def test_remove_db(self, remove):
  260. s = create_persistent_scheduler()[0](app=self.app,
  261. schedule_filename='schedule')
  262. s._remove_db()
  263. remove.assert_has_calls(
  264. [call('schedule' + suffix) for suffix in s.known_suffixes]
  265. )
  266. err = OSError()
  267. err.errno = errno.ENOENT
  268. remove.side_effect = err
  269. s._remove_db()
  270. err.errno = errno.EPERM
  271. with self.assertRaises(OSError):
  272. s._remove_db()
  273. def test_setup_schedule(self):
  274. s = create_persistent_scheduler()[0](app=self.app,
  275. schedule_filename='schedule')
  276. opens = s.persistence.open = Mock()
  277. s._remove_db = Mock()
  278. def effect(*args, **kwargs):
  279. if opens.call_count > 1:
  280. return s.sh
  281. raise OSError()
  282. opens.side_effect = effect
  283. s.setup_schedule()
  284. s._remove_db.assert_called_with()
  285. s._store = {'__version__': 1}
  286. s.setup_schedule()
  287. s._store.clear = Mock()
  288. op = s.persistence.open = Mock()
  289. op.return_value = s._store
  290. s._store['tz'] = 'FUNKY'
  291. s.setup_schedule()
  292. op.assert_called_with(s.schedule_filename, writeback=True)
  293. s._store.clear.assert_called_with()
  294. s._store['utc_enabled'] = False
  295. s._store.clear = Mock()
  296. s.setup_schedule()
  297. s._store.clear.assert_called_with()
  298. def test_get_schedule(self):
  299. s = create_persistent_scheduler()[0](
  300. schedule_filename='schedule', app=self.app,
  301. )
  302. s._store = {'entries': {}}
  303. s.schedule = {'foo': 'bar'}
  304. self.assertDictEqual(s.schedule, {'foo': 'bar'})
  305. self.assertDictEqual(s._store['entries'], s.schedule)
  306. class test_Service(AppCase):
  307. def get_service(self):
  308. Scheduler, mock_shelve = create_persistent_scheduler()
  309. return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
  310. def test_pickleable(self):
  311. s = beat.Service(app=self.app, scheduler_cls=Mock)
  312. self.assertTrue(loads(dumps(s)))
  313. def test_start(self):
  314. s, sh = self.get_service()
  315. schedule = s.scheduler.schedule
  316. self.assertIsInstance(schedule, dict)
  317. self.assertIsInstance(s.scheduler, beat.Scheduler)
  318. scheduled = list(schedule.keys())
  319. for task_name in keys(sh['entries']):
  320. self.assertIn(task_name, scheduled)
  321. s.sync()
  322. self.assertTrue(sh.closed)
  323. self.assertTrue(sh.synced)
  324. self.assertTrue(s._is_stopped.isSet())
  325. s.sync()
  326. s.stop(wait=False)
  327. self.assertTrue(s._is_shutdown.isSet())
  328. s.stop(wait=True)
  329. self.assertTrue(s._is_shutdown.isSet())
  330. p = s.scheduler._store
  331. s.scheduler._store = None
  332. try:
  333. s.scheduler.sync()
  334. finally:
  335. s.scheduler._store = p
  336. def test_start_embedded_process(self):
  337. s, sh = self.get_service()
  338. s._is_shutdown.set()
  339. s.start(embedded_process=True)
  340. def test_start_thread(self):
  341. s, sh = self.get_service()
  342. s._is_shutdown.set()
  343. s.start(embedded_process=False)
  344. def test_start_tick_raises_exit_error(self):
  345. s, sh = self.get_service()
  346. s.scheduler.tick_raises_exit = True
  347. s.start()
  348. self.assertTrue(s._is_shutdown.isSet())
  349. def test_start_manages_one_tick_before_shutdown(self):
  350. s, sh = self.get_service()
  351. s.scheduler.shutdown_service = s
  352. s.start()
  353. self.assertTrue(s._is_shutdown.isSet())
  354. class test_EmbeddedService(AppCase):
  355. def test_start_stop_process(self):
  356. try:
  357. import _multiprocessing # noqa
  358. except ImportError:
  359. raise SkipTest('multiprocessing not available')
  360. from billiard.process import Process
  361. s = beat.EmbeddedService(app=self.app)
  362. self.assertIsInstance(s, Process)
  363. self.assertIsInstance(s.service, beat.Service)
  364. s.service = MockService()
  365. class _Popen(object):
  366. terminated = False
  367. def terminate(self):
  368. self.terminated = True
  369. s.run()
  370. self.assertTrue(s.service.started)
  371. s._popen = _Popen()
  372. s.stop()
  373. self.assertTrue(s.service.stopped)
  374. self.assertTrue(s._popen.terminated)
  375. def test_start_stop_threaded(self):
  376. s = beat.EmbeddedService(thread=True, app=self.app)
  377. from threading import Thread
  378. self.assertIsInstance(s, Thread)
  379. self.assertIsInstance(s.service, beat.Service)
  380. s.service = MockService()
  381. s.run()
  382. self.assertTrue(s.service.started)
  383. s.stop()
  384. self.assertTrue(s.service.stopped)
  385. class test_schedule(AppCase):
  386. def test_maybe_make_aware(self):
  387. x = schedule(10)
  388. x.utc_enabled = True
  389. d = x.maybe_make_aware(datetime.utcnow())
  390. self.assertTrue(d.tzinfo)
  391. x.utc_enabled = False
  392. d2 = x.maybe_make_aware(datetime.utcnow())
  393. self.assertIsNone(d2.tzinfo)
  394. def test_to_local(self):
  395. x = schedule(10)
  396. x.utc_enabled = True
  397. d = x.to_local(datetime.utcnow())
  398. self.assertIsNone(d.tzinfo)
  399. x.utc_enabled = False
  400. d = x.to_local(datetime.utcnow())
  401. self.assertTrue(d.tzinfo)