Ver código fonte

Add tests for ensuring schedule changes are detected

Brian May 8 anos atrás
pai
commit
b27c0f143b
1 arquivos alterados com 62 adições e 1 exclusões
  1. 62 1
      t/unit/app/test_beat.py

+ 62 - 1
t/unit/app/test_beat.py

@@ -8,7 +8,7 @@ from celery import beat
 from celery import uuid
 from celery.beat import event_t
 from celery.five import keys, string_t
-from celery.schedules import schedule
+from celery.schedules import schedule, crontab
 from celery.utils.objects import Bunch
 
 
@@ -315,6 +315,23 @@ class test_Scheduler:
         scheduler.update_from_dict(s)
         assert scheduler.tick() == min(nums) - 0.010
 
+    def test_ticks_schedule_change(self):
+        # initialise schedule and check heap is not initialized
+        scheduler = mScheduler(app=self.app)
+        assert scheduler._heap is None
+
+        # set initial schedule and check heap is updated
+        schedule_5 = schedule(5)
+        scheduler.add(name='test_schedule', schedule=schedule_5)
+        scheduler.tick()
+        assert scheduler._heap[0].entry.schedule == schedule_5
+
+        # update schedule and check heap is updated
+        schedule_10 = schedule(10)
+        scheduler.add(name='test_schedule', schedule=schedule(10))
+        scheduler.tick()
+        assert scheduler._heap[0].entry.schedule == schedule_10
+
     def test_schedule_no_remain(self):
         scheduler = mScheduler(app=self.app)
         scheduler.add(name='test_schedule_no_remain',
@@ -349,6 +366,50 @@ class test_Scheduler:
         scheduler.populate_heap()
         assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]
 
+    def create_schedule_entry(self, schedule):
+        entry = dict(
+            name='celery.unittest.add',
+            schedule=schedule,
+            app=self.app,
+        )
+        return beat.ScheduleEntry(**dict(entry))
+
+    def test_schedule_equal_schedule_vs_schedule_success(self):
+        scheduler = beat.Scheduler(app=self.app)
+        a = {'a': self.create_schedule_entry(schedule(5))}
+        b = {'a': self.create_schedule_entry(schedule(5))}
+        assert scheduler.schedules_equal(a, b)
+
+    def test_schedule_equal_schedule_vs_schedule_fail(self):
+        scheduler = beat.Scheduler(app=self.app)
+        a = {'a': self.create_schedule_entry(schedule(5))}
+        b = {'a': self.create_schedule_entry(schedule(10))}
+        assert not scheduler.schedules_equal(a, b)
+
+    def test_schedule_equal_crontab_vs_crontab_success(self):
+        scheduler = beat.Scheduler(app=self.app)
+        a = {'a': self.create_schedule_entry(crontab(minute=5))}
+        b = {'a': self.create_schedule_entry(crontab(minute=5))}
+        assert scheduler.schedules_equal(a, b)
+
+    def test_schedule_equal_crontab_vs_crontab_fail(self):
+        scheduler = beat.Scheduler(app=self.app)
+        a = {'a': self.create_schedule_entry(crontab(minute=5))}
+        b = {'a': self.create_schedule_entry(crontab(minute=10))}
+        assert not scheduler.schedules_equal(a, b)
+
+    def test_schedule_equal_crontab_vs_schedule_fail(self):
+        scheduler = beat.Scheduler(app=self.app)
+        a = {'a': self.create_schedule_entry(crontab(minute=5))}
+        b = {'a': self.create_schedule_entry(schedule(5))}
+        assert not scheduler.schedules_equal(a, b)
+
+    def test_schedule_equal_different_key_fail(self):
+        scheduler = beat.Scheduler(app=self.app)
+        a = {'a': self.create_schedule_entry(schedule(5))}
+        b = {'b': self.create_schedule_entry(schedule(5))}
+        assert not scheduler.schedules_equal(a, b)
+
 
 def create_persistent_scheduler(shelv=None):
     if shelv is None: