test_beat.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. from __future__ import absolute_import
  2. import errno
  3. from datetime import datetime, timedelta
  4. from mock import Mock, call, patch
  5. from nose import SkipTest
  6. from celery import beat
  7. from celery import task
  8. from celery.five import keys, string_t
  9. from celery.result import AsyncResult
  10. from celery.schedules import schedule
  11. from celery.task.base import Task
  12. from celery.utils import uuid
  13. from celery.tests.utils import Case, patch_settings
  14. class Object(object):
  15. pass
  16. class MockShelve(dict):
  17. closed = False
  18. synced = False
  19. def close(self):
  20. self.closed = True
  21. def sync(self):
  22. self.synced = True
  23. class MockService(object):
  24. started = False
  25. stopped = False
  26. def __init__(self, *args, **kwargs):
  27. pass
  28. def start(self, **kwargs):
  29. self.started = True
  30. def stop(self, **kwargs):
  31. self.stopped = True
  32. class test_ScheduleEntry(Case):
  33. Entry = beat.ScheduleEntry
  34. def create_entry(self, **kwargs):
  35. entry = dict(name='celery.unittest.add',
  36. schedule=schedule(timedelta(seconds=10)),
  37. args=(2, 2),
  38. options={'routing_key': 'cpu'})
  39. return self.Entry(**dict(entry, **kwargs))
  40. def test_next(self):
  41. entry = self.create_entry(schedule=10)
  42. self.assertTrue(entry.last_run_at)
  43. self.assertIsInstance(entry.last_run_at, datetime)
  44. self.assertEqual(entry.total_run_count, 0)
  45. next_run_at = entry.last_run_at + timedelta(seconds=10)
  46. next_entry = entry.next(next_run_at)
  47. self.assertGreaterEqual(next_entry.last_run_at, next_run_at)
  48. self.assertEqual(next_entry.total_run_count, 1)
  49. def test_is_due(self):
  50. entry = self.create_entry(schedule=timedelta(seconds=10))
  51. due1, next_time_to_run1 = entry.is_due()
  52. self.assertFalse(due1)
  53. self.assertGreater(next_time_to_run1, 9)
  54. next_run_at = entry.last_run_at - timedelta(seconds=10)
  55. next_entry = entry.next(next_run_at)
  56. due2, next_time_to_run2 = next_entry.is_due()
  57. self.assertTrue(due2)
  58. self.assertGreater(next_time_to_run2, 9)
  59. def test_repr(self):
  60. entry = self.create_entry()
  61. self.assertIn('<Entry:', repr(entry))
  62. def test_update(self):
  63. entry = self.create_entry()
  64. self.assertEqual(entry.schedule, timedelta(seconds=10))
  65. self.assertTupleEqual(entry.args, (2, 2))
  66. self.assertDictEqual(entry.kwargs, {})
  67. self.assertDictEqual(entry.options, {'routing_key': 'cpu'})
  68. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  69. args=(16, 16),
  70. kwargs={'callback': 'foo.bar.baz'},
  71. options={'routing_key': 'urgent'})
  72. entry.update(entry2)
  73. self.assertEqual(entry.schedule, schedule(timedelta(minutes=20)))
  74. self.assertTupleEqual(entry.args, (16, 16))
  75. self.assertDictEqual(entry.kwargs, {'callback': 'foo.bar.baz'})
  76. self.assertDictEqual(entry.options, {'routing_key': 'urgent'})
  77. class mScheduler(beat.Scheduler):
  78. def __init__(self, *args, **kwargs):
  79. self.sent = []
  80. beat.Scheduler.__init__(self, *args, **kwargs)
  81. def send_task(self, name=None, args=None, kwargs=None, **options):
  82. self.sent.append({'name': name,
  83. 'args': args,
  84. 'kwargs': kwargs,
  85. 'options': options})
  86. return AsyncResult(uuid())
  87. class mSchedulerSchedulingError(mScheduler):
  88. def send_task(self, *args, **kwargs):
  89. raise beat.SchedulingError('Could not apply task')
  90. class mSchedulerRuntimeError(mScheduler):
  91. def maybe_due(self, *args, **kwargs):
  92. raise RuntimeError('dict modified while itervalues')
  93. class mocked_schedule(schedule):
  94. def __init__(self, is_due, next_run_at):
  95. self._is_due = is_due
  96. self._next_run_at = next_run_at
  97. self.run_every = timedelta(seconds=1)
  98. self.nowfun = datetime.utcnow
  99. def is_due(self, last_run_at):
  100. return self._is_due, self._next_run_at
  101. always_due = mocked_schedule(True, 1)
  102. always_pending = mocked_schedule(False, 1)
  103. class test_Scheduler(Case):
  104. def test_custom_schedule_dict(self):
  105. custom = {'foo': 'bar'}
  106. scheduler = mScheduler(schedule=custom, lazy=True)
  107. self.assertIs(scheduler.data, custom)
  108. def test_apply_async_uses_registered_task_instances(self):
  109. through_task = [False]
  110. class MockTask(Task):
  111. @classmethod
  112. def apply_async(cls, *args, **kwargs):
  113. through_task[0] = True
  114. assert MockTask.name in MockTask._get_app().tasks
  115. scheduler = mScheduler()
  116. scheduler.apply_async(scheduler.Entry(task=MockTask.name))
  117. self.assertTrue(through_task[0])
  118. def test_apply_async_should_not_sync(self):
  119. @task()
  120. def not_sync():
  121. pass
  122. not_sync.apply_async = Mock()
  123. s = mScheduler()
  124. s._do_sync = Mock()
  125. s.should_sync = Mock()
  126. s.should_sync.return_value = True
  127. s.apply_async(s.Entry(task=not_sync.name))
  128. s._do_sync.assert_called_with()
  129. s._do_sync = Mock()
  130. s.should_sync.return_value = False
  131. s.apply_async(s.Entry(task=not_sync.name))
  132. self.assertFalse(s._do_sync.called)
  133. @patch('celery.app.base.Celery.send_task')
  134. def test_send_task(self, send_task):
  135. b = beat.Scheduler()
  136. b.send_task('tasks.add', countdown=10)
  137. send_task.assert_called_with('tasks.add', countdown=10)
  138. def test_info(self):
  139. scheduler = mScheduler()
  140. self.assertIsInstance(scheduler.info, string_t)
  141. def test_maybe_entry(self):
  142. s = mScheduler()
  143. entry = s.Entry(name='add every', task='tasks.add')
  144. self.assertIs(s._maybe_entry(entry.name, entry), entry)
  145. self.assertTrue(s._maybe_entry('add every', {
  146. 'task': 'tasks.add',
  147. }))
  148. def test_set_schedule(self):
  149. s = mScheduler()
  150. s.schedule = {'foo': 'bar'}
  151. self.assertEqual(s.data, {'foo': 'bar'})
  152. @patch('kombu.connection.Connection.ensure_connection')
  153. def test_ensure_connection_error_handler(self, ensure):
  154. s = mScheduler()
  155. self.assertTrue(s._ensure_connected())
  156. self.assertTrue(ensure.called)
  157. callback = ensure.call_args[0][0]
  158. callback(KeyError(), 5)
  159. def test_install_default_entries(self):
  160. with patch_settings(CELERY_TASK_RESULT_EXPIRES=None,
  161. CELERYBEAT_SCHEDULE={}):
  162. s = mScheduler()
  163. s.install_default_entries({})
  164. self.assertNotIn('celery.backend_cleanup', s.data)
  165. with patch_settings(CELERY_TASK_RESULT_EXPIRES=30,
  166. CELERYBEAT_SCHEDULE={}):
  167. s = mScheduler()
  168. s.install_default_entries({})
  169. self.assertIn('celery.backend_cleanup', s.data)
  170. def test_due_tick(self):
  171. scheduler = mScheduler()
  172. scheduler.add(name='test_due_tick',
  173. schedule=always_due,
  174. args=(1, 2),
  175. kwargs={'foo': 'bar'})
  176. self.assertEqual(scheduler.tick(), 1)
  177. @patch('celery.beat.error')
  178. def test_due_tick_SchedulingError(self, error):
  179. scheduler = mSchedulerSchedulingError()
  180. scheduler.add(name='test_due_tick_SchedulingError',
  181. schedule=always_due)
  182. self.assertEqual(scheduler.tick(), 1)
  183. self.assertTrue(error.called)
  184. def test_due_tick_RuntimeError(self):
  185. scheduler = mSchedulerRuntimeError()
  186. scheduler.add(name='test_due_tick_RuntimeError',
  187. schedule=always_due)
  188. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  189. def test_pending_tick(self):
  190. scheduler = mScheduler()
  191. scheduler.add(name='test_pending_tick',
  192. schedule=always_pending)
  193. self.assertEqual(scheduler.tick(), 1)
  194. def test_honors_max_interval(self):
  195. scheduler = mScheduler()
  196. maxi = scheduler.max_interval
  197. scheduler.add(name='test_honors_max_interval',
  198. schedule=mocked_schedule(False, maxi * 4))
  199. self.assertEqual(scheduler.tick(), maxi)
  200. def test_ticks(self):
  201. scheduler = mScheduler()
  202. nums = [600, 300, 650, 120, 250, 36]
  203. s = dict(('test_ticks%s' % i,
  204. {'schedule': mocked_schedule(False, j)})
  205. for i, j in enumerate(nums))
  206. scheduler.update_from_dict(s)
  207. self.assertEqual(scheduler.tick(), min(nums))
  208. def test_schedule_no_remain(self):
  209. scheduler = mScheduler()
  210. scheduler.add(name='test_schedule_no_remain',
  211. schedule=mocked_schedule(False, None))
  212. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  213. def test_interface(self):
  214. scheduler = mScheduler()
  215. scheduler.sync()
  216. scheduler.setup_schedule()
  217. scheduler.close()
  218. def test_merge_inplace(self):
  219. a = mScheduler()
  220. b = mScheduler()
  221. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  222. 'bar': {'schedule': mocked_schedule(True, 20)}})
  223. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  224. 'baz': {'schedule': mocked_schedule(True, 10)}})
  225. a.merge_inplace(b.schedule)
  226. self.assertNotIn('foo', a.schedule)
  227. self.assertIn('baz', a.schedule)
  228. self.assertEqual(a.schedule['bar'].schedule._next_run_at, 40)
  229. def create_persistent_scheduler(shelv=None):
  230. if shelv is None:
  231. shelv = MockShelve()
  232. class MockPersistentScheduler(beat.PersistentScheduler):
  233. sh = shelv
  234. persistence = Object()
  235. persistence.open = lambda *a, **kw: shelv
  236. tick_raises_exit = False
  237. shutdown_service = None
  238. def tick(self):
  239. if self.tick_raises_exit:
  240. raise SystemExit()
  241. if self.shutdown_service:
  242. self.shutdown_service._is_shutdown.set()
  243. return 0.0
  244. return MockPersistentScheduler, shelv
  245. class test_PersistentScheduler(Case):
  246. @patch('os.remove')
  247. def test_remove_db(self, remove):
  248. s = create_persistent_scheduler()[0](schedule_filename='schedule')
  249. s._remove_db()
  250. remove.assert_has_calls(
  251. [call('schedule' + suffix) for suffix in s.known_suffixes]
  252. )
  253. err = OSError()
  254. err.errno = errno.ENOENT
  255. remove.side_effect = err
  256. s._remove_db()
  257. err.errno = errno.EPERM
  258. with self.assertRaises(OSError):
  259. s._remove_db()
  260. def test_setup_schedule(self):
  261. s = create_persistent_scheduler()[0](schedule_filename='schedule')
  262. opens = s.persistence.open = Mock()
  263. s._remove_db = Mock()
  264. def effect(*args, **kwargs):
  265. if opens.call_count > 1:
  266. return s.sh
  267. raise OSError()
  268. opens.side_effect = effect
  269. s.setup_schedule()
  270. s._remove_db.assert_called_with()
  271. s._store = {'__version__': 1}
  272. s.setup_schedule()
  273. def test_get_schedule(self):
  274. s = create_persistent_scheduler()[0](schedule_filename='schedule')
  275. s._store = {'entries': {}}
  276. s.schedule = {'foo': 'bar'}
  277. self.assertDictEqual(s.schedule, {'foo': 'bar'})
  278. self.assertDictEqual(s._store['entries'], s.schedule)
  279. class test_Service(Case):
  280. def get_service(self):
  281. Scheduler, mock_shelve = create_persistent_scheduler()
  282. return beat.Service(scheduler_cls=Scheduler), mock_shelve
  283. def test_start(self):
  284. s, sh = self.get_service()
  285. schedule = s.scheduler.schedule
  286. self.assertIsInstance(schedule, dict)
  287. self.assertIsInstance(s.scheduler, beat.Scheduler)
  288. scheduled = list(schedule.keys())
  289. for task_name in keys(sh['entries']):
  290. self.assertIn(task_name, scheduled)
  291. s.sync()
  292. self.assertTrue(sh.closed)
  293. self.assertTrue(sh.synced)
  294. self.assertTrue(s._is_stopped.isSet())
  295. s.sync()
  296. s.stop(wait=False)
  297. self.assertTrue(s._is_shutdown.isSet())
  298. s.stop(wait=True)
  299. self.assertTrue(s._is_shutdown.isSet())
  300. p = s.scheduler._store
  301. s.scheduler._store = None
  302. try:
  303. s.scheduler.sync()
  304. finally:
  305. s.scheduler._store = p
  306. def test_start_embedded_process(self):
  307. s, sh = self.get_service()
  308. s._is_shutdown.set()
  309. s.start(embedded_process=True)
  310. def test_start_thread(self):
  311. s, sh = self.get_service()
  312. s._is_shutdown.set()
  313. s.start(embedded_process=False)
  314. def test_start_tick_raises_exit_error(self):
  315. s, sh = self.get_service()
  316. s.scheduler.tick_raises_exit = True
  317. s.start()
  318. self.assertTrue(s._is_shutdown.isSet())
  319. def test_start_manages_one_tick_before_shutdown(self):
  320. s, sh = self.get_service()
  321. s.scheduler.shutdown_service = s
  322. s.start()
  323. self.assertTrue(s._is_shutdown.isSet())
  324. class test_EmbeddedService(Case):
  325. def test_start_stop_process(self):
  326. try:
  327. import _multiprocessing # noqa
  328. except ImportError:
  329. raise SkipTest('multiprocessing not available')
  330. from billiard.process import Process
  331. s = beat.EmbeddedService()
  332. self.assertIsInstance(s, Process)
  333. self.assertIsInstance(s.service, beat.Service)
  334. s.service = MockService()
  335. class _Popen(object):
  336. terminated = False
  337. def terminate(self):
  338. self.terminated = True
  339. s.run()
  340. self.assertTrue(s.service.started)
  341. s._popen = _Popen()
  342. s.stop()
  343. self.assertTrue(s.service.stopped)
  344. self.assertTrue(s._popen.terminated)
  345. def test_start_stop_threaded(self):
  346. s = beat.EmbeddedService(thread=True)
  347. from threading import Thread
  348. self.assertIsInstance(s, Thread)
  349. self.assertIsInstance(s.service, beat.Service)
  350. s.service = MockService()
  351. s.run()
  352. self.assertTrue(s.service.started)
  353. s.stop()
  354. self.assertTrue(s.service.stopped)