Browse Source

fix: celery.Scheduler._when return utctiemstamp (#5114)

#5113
yywing 6 years ago
parent
commit
58e6219c12
2 changed files with 26 additions and 6 deletions
  1. 6 4
      celery/beat.py
  2. 20 2
      t/unit/app/test_beat.py

+ 6 - 4
celery/beat.py

@@ -10,6 +10,7 @@ import shelve
 import sys
 import time
 import traceback
+from calendar import timegm
 from collections import namedtuple
 from functools import total_ordering
 from threading import Event, Thread
@@ -26,7 +27,7 @@ from .five import (items, monotonic, python_2_unicode_compatible, reraise,
 from .schedules import crontab, maybe_schedule
 from .utils.imports import load_extension_class_names, symbol_by_name
 from .utils.log import get_logger, iter_open_logger_fds
-from .utils.time import humanize_seconds
+from .utils.time import humanize_seconds, maybe_make_aware
 
 __all__ = (
     'SchedulingError', 'ScheduleEntry', 'Scheduler',
@@ -253,12 +254,13 @@ class Scheduler(object):
     def is_due(self, entry):
         return entry.is_due()
 
-    def _when(self, entry, next_time_to_run, mktime=time.mktime):
+    def _when(self, entry, next_time_to_run, mktime=timegm):
+        """Return a utc timestamp, make sure heapq in currect order."""
         adjust = self.adjust
 
-        as_now = entry.default_now()
+        as_now = maybe_make_aware(entry.default_now())
 
-        return (mktime(as_now.timetuple()) +
+        return (mktime(as_now.utctimetuple()) +
                 as_now.microsecond / 1e6 +
                 (adjust(next_time_to_run) or 0))
 

+ 20 - 2
t/unit/app/test_beat.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import, unicode_literals
 
 import errno
+import pytz
 from datetime import datetime, timedelta
 from pickle import dumps, loads
 
@@ -143,11 +144,12 @@ class mSchedulerRuntimeError(mScheduler):
 
 class mocked_schedule(schedule):
 
-    def __init__(self, is_due, next_run_at):
+    def __init__(self, is_due, next_run_at, nowfun=datetime.utcnow):
         self._is_due = is_due
         self._next_run_at = next_run_at
         self.run_every = timedelta(seconds=1)
-        self.nowfun = datetime.utcnow
+        self.nowfun = nowfun
+        self.default_now = self.nowfun
 
     def is_due(self, last_run_at):
         return self._is_due, self._next_run_at
@@ -371,6 +373,22 @@ class test_Scheduler:
         assert 'baz' in a.schedule
         assert a.schedule['bar'].schedule._next_run_at == 40
 
+    def test_when(self):
+        now_time_utc = datetime(2000, 10, 10, 10, 10, 10, 10, tzinfo=pytz.utc)
+        now_time_casey = now_time_utc.astimezone(
+            pytz.timezone('Antarctica/Casey')
+        )
+        scheduler = mScheduler(app=self.app)
+        result_utc = scheduler._when(
+            mocked_schedule(True, 10, lambda: now_time_utc),
+            10
+        )
+        result_casey = scheduler._when(
+            mocked_schedule(True, 10, lambda: now_time_casey),
+            10
+        )
+        assert result_utc == result_casey
+
     @patch('celery.beat.Scheduler._when', return_value=1)
     def test_populate_heap(self, _when):
         scheduler = mScheduler(app=self.app)