Browse Source

fix issue with not running due tasks after beat restart (#4493)

* fix issue with not running due tasks after beat restart

* add myself to contibutors

* fix import sort
Igor Kasianov 7 years ago
parent
commit
1844ece4b0
3 changed files with 86 additions and 3 deletions
  1. 1 0
      CONTRIBUTORS.txt
  2. 11 2
      celery/beat.py
  3. 74 1
      t/unit/app/test_beat.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -256,3 +256,4 @@ Tobias 'rixx' Kunze, 2017/08/20
 Mikhail Wolfson, 2017/12/11
 Alex Garel, 2018/01/04
 Régis Behmo 2018/01/20
+Igor Kasianov, 2018/01/20

+ 11 - 2
celery/beat.py

@@ -261,8 +261,17 @@ class Scheduler(object):
 
     def populate_heap(self, event_t=event_t, heapify=heapq.heapify):
         """Populate the heap with the data contained in the schedule."""
-        self._heap = [event_t(self._when(e, e.is_due()[1]) or 0, 5, e)
-                      for e in values(self.schedule)]
+        priority = 5
+        self._heap = []
+        for entry in values(self.schedule):
+            is_due, next_call_delay = entry.is_due()
+            self._heap.append(event_t(
+                self._when(
+                    entry,
+                    0 if is_due else next_call_delay
+                ) or 0,
+                priority, entry
+            ))
         heapify(self._heap)
 
     # pylint disable=redefined-outer-name

+ 74 - 1
t/unit/app/test_beat.py

@@ -7,7 +7,7 @@ from pickle import dumps, loads
 import pytest
 from case import Mock, call, patch, skip
 
-from celery import beat, uuid
+from celery import __version__, beat, uuid
 from celery.beat import event_t
 from celery.five import keys, string_t
 from celery.schedules import crontab, schedule
@@ -487,6 +487,29 @@ def create_persistent_scheduler(shelv=None):
     return MockPersistentScheduler, shelv
 
 
+def create_persistent_scheduler_w_call_logging(shelv=None):
+    if shelv is None:
+        shelv = MockShelve()
+
+    class MockPersistentScheduler(beat.PersistentScheduler):
+        sh = shelv
+        persistence = Bunch(
+            open=lambda *a, **kw: shelv,
+        )
+
+        def __init__(self, *args, **kwargs):
+            self.sent = []
+            beat.PersistentScheduler.__init__(self, *args, **kwargs)
+
+        def send_task(self, task=None, args=None, kwargs=None, **options):
+            self.sent.append({'task': task,
+                              'args': args,
+                              'kwargs': kwargs,
+                              'options': options})
+            return self.app.AsyncResult(uuid())
+    return MockPersistentScheduler, shelv
+
+
 class test_PersistentScheduler:
 
     @patch('os.remove')
@@ -543,6 +566,56 @@ class test_PersistentScheduler:
         assert s.schedule == {'foo': 'bar'}
         assert s._store[str('entries')] == s.schedule
 
+    def test_run_all_due_tasks_after_restart(self):
+        scheduler_class, shelve = create_persistent_scheduler_w_call_logging()
+
+        shelve['tz'] = 'UTC'
+        shelve['utc_enabled'] = True
+        shelve['__version__'] = __version__
+        cur_seconds = 20
+
+        def now_func():
+            return datetime(2018, 1, 1, 1, 11, cur_seconds)
+        app_schedule = {
+            'first_missed': {'schedule': crontab(
+                minute='*/10', nowfun=now_func), 'task': 'first_missed'},
+            'second_missed': {'schedule': crontab(
+                minute='*/1', nowfun=now_func), 'task': 'second_missed'},
+            'non_missed': {'schedule': crontab(
+                minute='*/13', nowfun=now_func), 'task': 'non_missed'}
+        }
+        shelve['entries'] = {
+            'first_missed': beat.ScheduleEntry(
+                'first_missed', 'first_missed',
+                last_run_at=now_func() - timedelta(minutes=2),
+                total_run_count=10,
+                schedule=app_schedule['first_missed']['schedule']),
+            'second_missed': beat.ScheduleEntry(
+                'second_missed', 'second_missed',
+                last_run_at=now_func() - timedelta(minutes=2),
+                total_run_count=10,
+                schedule=app_schedule['second_missed']['schedule']),
+            'non_missed': beat.ScheduleEntry(
+                'non_missed', 'non_missed',
+                last_run_at=now_func() - timedelta(minutes=2),
+                total_run_count=10,
+                schedule=app_schedule['non_missed']['schedule']),
+        }
+
+        self.app.conf.beat_schedule = app_schedule
+
+        scheduler = scheduler_class(self.app)
+
+        max_iter_number = 5
+        for i in range(max_iter_number):
+            delay = scheduler.tick()
+            if delay > 0:
+                break
+        assert {'first_missed', 'second_missed'} == {
+            item['task'] for item in scheduler.sent}
+        # ensure next call on the beginning of next min
+        assert abs(60 - cur_seconds - delay) < 1
+
 
 class test_Service: