test_beat.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import unittest
  2. import logging
  3. from datetime import datetime, timedelta
  4. from celery import log
  5. from celery import beat
  6. from celery import conf
  7. from celery.utils import gen_unique_id
  8. from celery.task.base import PeriodicTask
  9. from celery.registry import TaskRegistry
  10. from celery.result import AsyncResult
  11. class MockShelve(dict):
  12. closed = False
  13. synced = False
  14. def close(self):
  15. self.closed = True
  16. def sync(self):
  17. self.synced = True
  18. class MockClockService(object):
  19. started = False
  20. stopped = False
  21. def __init__(self, *args, **kwargs):
  22. pass
  23. def start(self, **kwargs):
  24. self.started = True
  25. def stop(self, **kwargs):
  26. self.stopped = True
  27. class DuePeriodicTask(PeriodicTask):
  28. run_every = timedelta(seconds=1)
  29. applied = False
  30. def is_due(self, *args, **kwargs):
  31. return True, 100
  32. @classmethod
  33. def apply_async(self, *args, **kwargs):
  34. self.applied = True
  35. return AsyncResult(gen_unique_id())
  36. class DuePeriodicTaskRaising(PeriodicTask):
  37. run_every = timedelta(seconds=1)
  38. applied = False
  39. def is_due(self, *args, **kwargs):
  40. return True, 0
  41. @classmethod
  42. def apply_async(self, *args, **kwargs):
  43. raise Exception("FoozBaaz")
  44. class PendingPeriodicTask(PeriodicTask):
  45. run_every = timedelta(seconds=1)
  46. applied = False
  47. def is_due(self, *args, **kwargs):
  48. return False, 100
  49. @classmethod
  50. def apply_async(self, *args, **kwargs):
  51. self.applied = True
  52. return AsyncResult(gen_unique_id())
  53. class AdditionalTask(PeriodicTask):
  54. run_every = timedelta(days=7)
  55. @classmethod
  56. def apply_async(self, *args, **kwargs):
  57. raise Exception("FoozBaaz")
  58. class TestScheduleEntry(unittest.TestCase):
  59. def test_constructor(self):
  60. s = beat.ScheduleEntry(DuePeriodicTask.name)
  61. self.assertEquals(s.name, DuePeriodicTask.name)
  62. self.assertTrue(isinstance(s.last_run_at, datetime))
  63. self.assertEquals(s.total_run_count, 0)
  64. now = datetime.now()
  65. s = beat.ScheduleEntry(DuePeriodicTask.name, now, 300)
  66. self.assertEquals(s.name, DuePeriodicTask.name)
  67. self.assertEquals(s.last_run_at, now)
  68. self.assertEquals(s.total_run_count, 300)
  69. def test_next(self):
  70. s = beat.ScheduleEntry(DuePeriodicTask.name, None, 300)
  71. n = s.next()
  72. self.assertEquals(n.name, s.name)
  73. self.assertEquals(n.total_run_count, 301)
  74. self.assertTrue(n.last_run_at > s.last_run_at)
  75. def test_is_due(self):
  76. due = beat.ScheduleEntry(DuePeriodicTask.name)
  77. pending = beat.ScheduleEntry(PendingPeriodicTask.name)
  78. self.assertTrue(due.is_due(DuePeriodicTask())[0])
  79. self.assertFalse(pending.is_due(PendingPeriodicTask())[0])
  80. class TestScheduler(unittest.TestCase):
  81. def setUp(self):
  82. self.registry = TaskRegistry()
  83. self.registry.register(DuePeriodicTask)
  84. self.registry.register(PendingPeriodicTask)
  85. self.scheduler = beat.Scheduler(self.registry,
  86. max_interval=0.0001,
  87. logger=log.get_default_logger())
  88. def test_constructor(self):
  89. s = beat.Scheduler()
  90. self.assertTrue(isinstance(s.registry, TaskRegistry))
  91. self.assertTrue(isinstance(s.schedule, dict))
  92. self.assertTrue(isinstance(s.logger, logging.Logger))
  93. self.assertEquals(s.max_interval, conf.CELERYBEAT_MAX_LOOP_INTERVAL)
  94. def test_cleanup(self):
  95. self.scheduler.schedule["fbz"] = beat.ScheduleEntry("fbz")
  96. self.scheduler.cleanup()
  97. self.assertTrue("fbz" not in self.scheduler.schedule)
  98. def test_schedule_registry(self):
  99. self.registry.register(AdditionalTask)
  100. self.scheduler.schedule_registry()
  101. self.assertTrue(AdditionalTask.name in self.scheduler.schedule)
  102. def test_apply_async(self):
  103. due_task = self.registry[DuePeriodicTask.name]
  104. self.scheduler.apply_async(self.scheduler[due_task.name])
  105. self.assertTrue(due_task.applied)
  106. def test_apply_async_raises_SchedulingError_on_error(self):
  107. self.registry.register(AdditionalTask)
  108. self.scheduler.schedule_registry()
  109. add_task = self.registry[AdditionalTask.name]
  110. self.assertRaises(beat.SchedulingError,
  111. self.scheduler.apply_async,
  112. self.scheduler[add_task.name])
  113. def test_is_due(self):
  114. due = self.scheduler[DuePeriodicTask.name]
  115. pending = self.scheduler[PendingPeriodicTask.name]
  116. self.assertTrue(self.scheduler.is_due(due)[0])
  117. self.assertFalse(self.scheduler.is_due(pending)[0])
  118. def test_tick(self):
  119. self.scheduler.schedule.pop(DuePeriodicTaskRaising.name, None)
  120. self.registry.pop(DuePeriodicTaskRaising.name, None)
  121. self.assertEquals(self.scheduler.tick(),
  122. self.scheduler.max_interval)
  123. def test_quick_schedulingerror(self):
  124. self.registry.register(DuePeriodicTaskRaising)
  125. self.scheduler.schedule_registry()
  126. self.assertEquals(self.scheduler.tick(),
  127. self.scheduler.max_interval)
  128. class TestClockService(unittest.TestCase):
  129. def test_start(self):
  130. s = beat.ClockService()
  131. sh = MockShelve()
  132. s.open_schedule = lambda *a, **kw: sh
  133. self.assertTrue(isinstance(s.schedule, dict))
  134. self.assertTrue(isinstance(s.schedule, dict))
  135. self.assertTrue(isinstance(s.scheduler, beat.Scheduler))
  136. self.assertTrue(isinstance(s.scheduler, beat.Scheduler))
  137. self.assertTrue(s.schedule is sh)
  138. self.assertTrue(s._schedule is sh)
  139. s._in_sync = False
  140. s.sync()
  141. self.assertTrue(sh.closed)
  142. self.assertTrue(sh.synced)
  143. self.assertTrue(s._stopped.isSet())
  144. s.sync()
  145. s.stop(wait=False)
  146. self.assertTrue(s._shutdown.isSet())
  147. s.stop(wait=True)
  148. self.assertTrue(s._shutdown.isSet())
  149. class TestEmbeddedClockService(unittest.TestCase):
  150. def test_start_stop_process(self):
  151. s = beat.EmbeddedClockService()
  152. from multiprocessing import Process
  153. self.assertTrue(isinstance(s, Process))
  154. self.assertTrue(isinstance(s.clockservice, beat.ClockService))
  155. s.clockservice = MockClockService()
  156. class _Popen(object):
  157. terminated = False
  158. def terminate(self):
  159. self.terminated = True
  160. s.run()
  161. self.assertTrue(s.clockservice.started)
  162. s._popen = _Popen()
  163. s.stop()
  164. self.assertTrue(s.clockservice.stopped)
  165. self.assertTrue(s._popen.terminated)
  166. def test_start_stop_threaded(self):
  167. s = beat.EmbeddedClockService(thread=True)
  168. from threading import Thread
  169. self.assertTrue(isinstance(s, Thread))
  170. self.assertTrue(isinstance(s.clockservice, beat.ClockService))
  171. s.clockservice = MockClockService()
  172. s.run()
  173. self.assertTrue(s.clockservice.started)
  174. s.stop()
  175. self.assertTrue(s.clockservice.stopped)