Browse Source

Provide a transparent method to update the Scheduler heap (#3721)

* Provide a transparent method to update the heap

In previous versions, we were able to modify dynamically the `schedule`
dict and it will be seen reflected in the scheduled tasks since `tick`
method directly iterated over it.

In 4.0 version, a heap is used and set after the init (when `_heap` is
None), this breaks our ability to dynamically change the `schedule` and
see it reflected without directly modifying the `_heap` attribute thus
breaking the abstraction.

Since there are already methods than allows to modify the `schedule`
without going straight into inner implementation, therefore providing a
method to modify the `_heap` without knowing of its internal makes
sense.

* Remove unnecessary passing of arguments

The arguments removed are not for user change but performance

* Pep 8

* Fix function docstring

* Remove unused argument

* Add myself to contributors
Alejandro Pernin 8 years ago
parent
commit
cfcf3f8e77
3 changed files with 31 additions and 12 deletions
  1. 1 0
      CONTRIBUTORS.txt
  2. 20 12
      celery/beat.py
  3. 10 0
      t/unit/app/test_beat.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -226,3 +226,4 @@ zhengxiaowai, 2016/12/07
 Michael Howitz, 2016/12/08
 Michael Howitz, 2016/12/08
 Andreas Pelme, 2016/12/13
 Andreas Pelme, 2016/12/13
 Mike Chen, 2016/12/20
 Mike Chen, 2016/12/20
+Alejandro Pernin, 2016/12/23

+ 20 - 12
celery/beat.py

@@ -232,10 +232,21 @@ class Scheduler(object):
     def is_due(self, entry):
     def is_due(self, entry):
         return entry.is_due()
         return entry.is_due()
 
 
+    def _when(self, entry, next_time_to_run, mktime=time.mktime):
+        adjust = self.adjust
+
+        return (mktime(entry.schedule.now().timetuple()) +
+                (adjust(next_time_to_run) or 0))
+
+    def populate_heap(self, event_t=event_t, heapify=heapq.heapify):
+        """Populate the heap with the data contained in the schedule."""
+        self._heap = [event_t(self._when(e, e.is_due()[1]) or 0, 5, e)
+                      for e in values(self.schedule)]
+        heapify(self._heap)
+
     # pylint disable=redefined-outer-name
     # pylint disable=redefined-outer-name
-    def tick(self, event_t=event_t, min=min,
-             heappop=heapq.heappop, heappush=heapq.heappush,
-             heapify=heapq.heapify, mktime=time.mktime):
+    def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
+             heappush=heapq.heappush):
         """Run a tick - one iteration of the scheduler.
         """Run a tick - one iteration of the scheduler.
 
 
         Executes one due task per call.
         Executes one due task per call.
@@ -243,17 +254,14 @@ class Scheduler(object):
         Returns:
         Returns:
             float: preferred delay in seconds for next call.
             float: preferred delay in seconds for next call.
         """
         """
-        def _when(entry, next_time_to_run):
-            return (mktime(entry.schedule.now().timetuple()) +
-                    (adjust(next_time_to_run) or 0))
-
         adjust = self.adjust
         adjust = self.adjust
         max_interval = self.max_interval
         max_interval = self.max_interval
+
+        if self._heap is None:
+            self.populate_heap()
+
         H = self._heap
         H = self._heap
-        if H is None:
-            H = self._heap = [event_t(_when(e, e.is_due()[1]) or 0, 5, e)
-                              for e in values(self.schedule)]
-            heapify(H)
+
         if not H:
         if not H:
             return max_interval
             return max_interval
 
 
@@ -265,7 +273,7 @@ class Scheduler(object):
             if verify is event:
             if verify is event:
                 next_entry = self.reserve(entry)
                 next_entry = self.reserve(entry)
                 self.apply_entry(entry, producer=self.producer)
                 self.apply_entry(entry, producer=self.producer)
-                heappush(H, event_t(_when(next_entry, next_time_to_run),
+                heappush(H, event_t(self._when(next_entry, next_time_to_run),
                                     event[1], next_entry))
                                     event[1], next_entry))
                 return 0
                 return 0
             else:
             else:

+ 10 - 0
t/unit/app/test_beat.py

@@ -6,6 +6,7 @@ from pickle import dumps, loads
 from case import Mock, call, patch, skip
 from case import Mock, call, patch, skip
 from celery import beat
 from celery import beat
 from celery import uuid
 from celery import uuid
+from celery.beat import event_t
 from celery.five import keys, string_t
 from celery.five import keys, string_t
 from celery.schedules import schedule
 from celery.schedules import schedule
 from celery.utils.objects import Bunch
 from celery.utils.objects import Bunch
@@ -339,6 +340,15 @@ class test_Scheduler:
         assert 'baz' in a.schedule
         assert 'baz' in a.schedule
         assert a.schedule['bar'].schedule._next_run_at == 40
         assert a.schedule['bar'].schedule._next_run_at == 40
 
 
+    @patch('celery.beat.Scheduler._when', return_value=1)
+    def test_populate_heap(self, _when):
+        scheduler = mScheduler(app=self.app)
+        scheduler.update_from_dict(
+            {'foo': {'schedule': mocked_schedule(True, 10)}}
+        )
+        scheduler.populate_heap()
+        assert scheduler._heap == [event_t(1, 5, scheduler.schedule['foo'])]
+
 
 
 def create_persistent_scheduler(shelv=None):
 def create_persistent_scheduler(shelv=None):
     if shelv is None:
     if shelv is None: