test_beat.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import logging
  2. import unittest2 as unittest
  3. from datetime import datetime, timedelta
  4. from celery import beat
  5. from celery.result import AsyncResult
  6. from celery.schedules import schedule
  7. from celery.utils import gen_unique_id
  8. class Object(object):
  9. pass
  10. class MockShelve(dict):
  11. closed = False
  12. synced = False
  13. def close(self):
  14. self.closed = True
  15. def sync(self):
  16. self.synced = True
  17. class MockService(object):
  18. started = False
  19. stopped = False
  20. def __init__(self, *args, **kwargs):
  21. pass
  22. def start(self, **kwargs):
  23. self.started = True
  24. def stop(self, **kwargs):
  25. self.stopped = True
  26. class test_ScheduleEntry(unittest.TestCase):
  27. Entry = beat.ScheduleEntry
  28. def create_entry(self, **kwargs):
  29. entry = dict(name="celery.unittest.add",
  30. schedule=schedule(timedelta(seconds=10)),
  31. args=(2, 2),
  32. options={"routing_key": "cpu"})
  33. return self.Entry(**dict(entry, **kwargs))
  34. def test_next(self):
  35. entry = self.create_entry(schedule=10)
  36. self.assertTrue(entry.last_run_at)
  37. self.assertIsInstance(entry.last_run_at, datetime)
  38. self.assertEqual(entry.total_run_count, 0)
  39. next_run_at = entry.last_run_at + timedelta(seconds=10)
  40. next = entry.next(next_run_at)
  41. self.assertGreaterEqual(next.last_run_at, next_run_at)
  42. self.assertEqual(next.total_run_count, 1)
  43. def test_is_due(self):
  44. entry = self.create_entry(schedule=timedelta(seconds=10))
  45. due1, next_time_to_run1 = entry.is_due()
  46. self.assertFalse(due1)
  47. self.assertGreater(next_time_to_run1, 9)
  48. next_run_at = entry.last_run_at - timedelta(seconds=10)
  49. next = entry.next(next_run_at)
  50. due2, next_time_to_run2 = next.is_due()
  51. self.assertTrue(due2)
  52. self.assertGreater(next_time_to_run2, 9)
  53. def test_repr(self):
  54. entry = self.create_entry()
  55. self.assertIn("<Entry:", repr(entry))
  56. def test_update(self):
  57. entry = self.create_entry()
  58. self.assertEqual(entry.schedule, timedelta(seconds=10))
  59. self.assertTupleEqual(entry.args, (2, 2))
  60. self.assertDictEqual(entry.kwargs, {})
  61. self.assertDictEqual(entry.options, {"routing_key": "cpu"})
  62. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  63. args=(16, 16),
  64. kwargs={"callback": "foo.bar.baz"},
  65. options={"routing_key": "urgent"})
  66. entry.update(entry2)
  67. self.assertEqual(entry.schedule, schedule(timedelta(minutes=20)))
  68. self.assertTupleEqual(entry.args, (16, 16))
  69. self.assertDictEqual(entry.kwargs, {"callback": "foo.bar.baz"})
  70. self.assertDictEqual(entry.options, {"routing_key": "urgent"})
  71. class MockLogger(logging.Logger):
  72. def __init__(self, *args, **kwargs):
  73. self.logged = []
  74. logging.Logger.__init__(self, *args, **kwargs)
  75. def _log(self, level, msg, args, **kwargs):
  76. self.logged.append((level, msg))
  77. class mScheduler(beat.Scheduler):
  78. def __init__(self, *args, **kwargs):
  79. self.sent = []
  80. beat.Scheduler.__init__(self, *args, **kwargs)
  81. self.logger = MockLogger("celery.beat", logging.ERROR)
  82. def send_task(self, name=None, args=None, kwargs=None, **options):
  83. self.sent.append({"name": name,
  84. "args": args,
  85. "kwargs": kwargs,
  86. "options": options})
  87. return AsyncResult(gen_unique_id())
  88. class mSchedulerSchedulingError(mScheduler):
  89. def send_task(self, *args, **kwargs):
  90. raise beat.SchedulingError("Could not apply task")
  91. class mSchedulerRuntimeError(mScheduler):
  92. def maybe_due(self, *args, **kwargs):
  93. raise RuntimeError("dict modified while itervalues")
  94. class mocked_schedule(schedule):
  95. def __init__(self, is_due, next_run_at):
  96. self._is_due = is_due
  97. self._next_run_at = next_run_at
  98. self.run_every = timedelta(seconds=1)
  99. def is_due(self, last_run_at):
  100. return self._is_due, self._next_run_at
  101. always_due = mocked_schedule(True, 1)
  102. always_pending = mocked_schedule(False, 1)
  103. class test_Scheduler(unittest.TestCase):
  104. def test_due_tick(self):
  105. scheduler = mScheduler()
  106. scheduler.add(name="test_due_tick",
  107. schedule=always_due,
  108. args=(1, 2),
  109. kwargs={"foo": "bar"})
  110. self.assertEqual(scheduler.tick(), 1)
  111. def test_due_tick_SchedulingError(self):
  112. scheduler = mSchedulerSchedulingError()
  113. scheduler.add(name="test_due_tick_SchedulingError",
  114. schedule=always_due)
  115. self.assertEqual(scheduler.tick(), 1)
  116. self.assertTrue(scheduler.logger.logged[0])
  117. level, msg = scheduler.logger.logged[0]
  118. self.assertEqual(level, logging.ERROR)
  119. self.assertIn("Couldn't apply scheduled task", msg)
  120. def test_due_tick_RuntimeError(self):
  121. scheduler = mSchedulerRuntimeError()
  122. scheduler.add(name="test_due_tick_RuntimeError",
  123. schedule=always_due)
  124. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  125. def test_pending_tick(self):
  126. scheduler = mScheduler()
  127. scheduler.add(name="test_pending_tick",
  128. schedule=always_pending)
  129. self.assertEqual(scheduler.tick(), 1)
  130. def test_honors_max_interval(self):
  131. scheduler = mScheduler()
  132. maxi = scheduler.max_interval
  133. scheduler.add(name="test_honors_max_interval",
  134. schedule=mocked_schedule(False, maxi * 4))
  135. self.assertEqual(scheduler.tick(), maxi)
  136. def test_ticks(self):
  137. scheduler = mScheduler()
  138. nums = [600, 300, 650, 120, 250, 36]
  139. s = dict(("test_ticks%s" % i,
  140. {"schedule": mocked_schedule(False, j)})
  141. for i, j in enumerate(nums))
  142. scheduler.update_from_dict(s)
  143. self.assertEqual(scheduler.tick(), min(nums))
  144. def test_schedule_no_remain(self):
  145. scheduler = mScheduler()
  146. scheduler.add(name="test_schedule_no_remain",
  147. schedule=mocked_schedule(False, None))
  148. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  149. def test_interface(self):
  150. scheduler = mScheduler()
  151. scheduler.sync()
  152. scheduler.setup_schedule()
  153. scheduler.close()
  154. def test_set_schedule(self):
  155. scheduler = mScheduler()
  156. a, b = scheduler.schedule, {}
  157. scheduler.schedule = b
  158. self.assertIs(scheduler.schedule, b)
  159. def test_merge_inplace(self):
  160. a = mScheduler()
  161. b = mScheduler()
  162. a.update_from_dict({"foo": {"schedule": mocked_schedule(True, 10)},
  163. "bar": {"schedule": mocked_schedule(True, 20)}})
  164. b.update_from_dict({"bar": {"schedule": mocked_schedule(True, 40)},
  165. "baz": {"schedule": mocked_schedule(True, 10)}})
  166. a.merge_inplace(b)
  167. self.assertNotIn("foo", a)
  168. self.assertIn("baz", a)
  169. self.assertEqual(a["bar"].schedule._next_run_at, 40)
  170. class test_Service(unittest.TestCase):
  171. def test_start(self):
  172. sh = MockShelve()
  173. class PersistentScheduler(beat.PersistentScheduler):
  174. persistence = Object()
  175. persistence.open = lambda *a, **kw: sh
  176. s = beat.Service(scheduler_cls=PersistentScheduler)
  177. self.assertIsInstance(s.schedule, dict)
  178. self.assertIsInstance(s.scheduler, beat.Scheduler)
  179. scheduled = s.schedule.keys()
  180. for task_name in sh.keys():
  181. self.assertIn(task_name, scheduled)
  182. s.sync()
  183. self.assertTrue(sh.closed)
  184. self.assertTrue(sh.synced)
  185. self.assertTrue(s._stopped.isSet())
  186. s.sync()
  187. s.stop(wait=False)
  188. self.assertTrue(s._shutdown.isSet())
  189. s.stop(wait=True)
  190. self.assertTrue(s._shutdown.isSet())
  191. p = s.scheduler._store
  192. s.scheduler._store = None
  193. try:
  194. s.scheduler.sync()
  195. finally:
  196. s.scheduler._store = p
  197. class test_EmbeddedService(unittest.TestCase):
  198. def test_start_stop_process(self):
  199. s = beat.EmbeddedService()
  200. from multiprocessing import Process
  201. self.assertIsInstance(s, Process)
  202. self.assertIsInstance(s.service, beat.Service)
  203. s.service = MockService()
  204. class _Popen(object):
  205. terminated = False
  206. def terminate(self):
  207. self.terminated = True
  208. s.run()
  209. self.assertTrue(s.service.started)
  210. s._popen = _Popen()
  211. s.stop()
  212. self.assertTrue(s.service.stopped)
  213. self.assertTrue(s._popen.terminated)
  214. def test_start_stop_threaded(self):
  215. s = beat.EmbeddedService(thread=True)
  216. from threading import Thread
  217. self.assertIsInstance(s, Thread)
  218. self.assertIsInstance(s.service, beat.Service)
  219. s.service = MockService()
  220. s.run()
  221. self.assertTrue(s.service.started)
  222. s.stop()
  223. self.assertTrue(s.service.stopped)