Browse Source

Proper remaining_estimate for crontabs. No need to wake up the scheduler every second.

jbochi 14 years ago
parent
commit
27b12d0e89
2 changed files with 137 additions and 36 deletions
  1. 57 11
      celery/schedules.py
  2. 80 25
      celery/tests/test_task.py

+ 57 - 11
celery/schedules.py

@@ -1,4 +1,5 @@
 from datetime import datetime, timedelta
+from dateutil import relativedelta as rd
 from pyparsing import (Word, Literal, ZeroOrMore, Optional,
                        Group, StringEnd, alphas)
 
@@ -229,19 +230,63 @@ class crontab(schedule):
                                  self._orig_day_of_week), None)
 
     def remaining_estimate(self, last_run_at):
-        # remaining_estimate controls the frequency of scheduler
-        # ticks. The scheduler needs to wake up every second in this case.
-        return 1
+        """Returns when the periodic task should run next as a timedelta."""
+        weekday = last_run_at.isoweekday()
+        execute_this_hour = weekday in self.day_of_week \
+                            and last_run_at.hour in self.hour \
+                            and last_run_at.minute < max(self.minute)
+
+        if execute_this_hour:
+            next_minute = min([minute for minute in self.minute
+                               if minute > last_run_at.minute])
+            delta = rd.relativedelta(minute=next_minute,
+                                     second=0,
+                                     microsecond=0)
+        else:
+            next_minute = min(self.minute)
+
+            execute_today = weekday in self.day_of_week \
+                            and (last_run_at.hour < max(self.hour) or \
+                                 execute_this_hour)
+
+            if execute_today:
+                next_hour = min([hour for hour in self.hour if \
+                                 hour > last_run_at.hour])
+                delta = rd.relativedelta(hour=next_hour,
+                                         minute=next_minute,
+                                         second=0,
+                                         microsecond=0)
+            else:
+                next_hour = min(self.hour)
+                iso_next_day = min([day for day in self.day_of_week if \
+                                    day > weekday] or self.day_of_week)
+                add_week = iso_next_day == weekday
+                
+                delta = rd.relativedelta(weeks=1 if add_week else 0,
+                                         weekday=(iso_next_day - 1) % 7,
+                                         hour=next_hour,
+                                         minute=next_minute,
+                                         second=0,
+                                         microsecond=0)
+                
+        return remaining(last_run_at, delta, now=self.nowfun())
 
     def is_due(self, last_run_at):
-        now = self.nowfun()
-        last = now - last_run_at
-        due, when = False, 1
-        if last.days > 0 or last.seconds > 60:
-            due = (now.isoweekday() % 7 in self.day_of_week and
-                   now.hour in self.hour and
-                   now.minute in self.minute)
-        return due, when
+        """Returns tuple of two items ``(is_due, next_time_to_run)``,
+        where next time to run is in seconds.
+
+        See :meth:`celery.task.base.PeriodicTask.is_due` for more information.
+
+        """
+        rem_delta = self.remaining_estimate(last_run_at)
+        rem = timedelta_seconds(rem_delta)
+        due = rem == 0
+        if due:
+            rem_delta = self.remaining_estimate(last_run_at=self.nowfun())
+            rem = timedelta_seconds(rem_delta)
+            
+        return due, rem
+
 
     def __eq__(self, other):
         if isinstance(other, crontab):
@@ -257,3 +302,4 @@ def maybe_schedule(s, relative=False):
     if isinstance(s, timedelta):
         return schedule(s, relative)
     return s
+

+ 80 - 25
celery/tests/test_task.py

@@ -575,8 +575,62 @@ class test_crontab_parser(unittest.TestCase):
         self.assertRaises(ParseException, crontab_parser(60).parse, '-20')
 
 
+class test_crontab_remaining_estimate(unittest.TestCase):
+
+    def next_ocurrance(self, crontab, now):
+        crontab.nowfun = lambda: now
+        return now + crontab.remaining_estimate(now)
+
+    def test_next_minute(self):
+        next = self.next_ocurrance(crontab(),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 11, 14, 31))
+
+    def test_not_next_minute(self):
+        next = self.next_ocurrance(crontab(),
+                                   datetime(2010, 9, 11, 14, 59, 15))
+        self.assertEquals(next, datetime(2010, 9, 11, 15, 0))
+
+    def test_this_hour(self):
+        next = self.next_ocurrance(crontab(minute=[5, 42]),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 11, 14, 42))
+
+    def test_not_this_hour(self):
+        next = self.next_ocurrance(crontab(minute=[5, 10, 15]),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 11, 15, 5))
+
+    def test_today(self):
+        next = self.next_ocurrance(crontab(minute=[5, 42], hour=[12, 17]),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 11, 17, 5))
+
+    def test_not_today(self):
+        next = self.next_ocurrance(crontab(minute=[5, 42], hour=[12]),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 12, 12, 5))
+
+    def test_weekday(self):
+        next = self.next_ocurrance(crontab(minute=30,
+                                           hour=14,
+                                           day_of_week="sat"),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 18, 14, 30))
+
+    def test_not_weekday(self):
+        next = self.next_ocurrance(crontab(minute=[5, 42],
+                                           day_of_week="mon-fri"),
+                                   datetime(2010, 9, 11, 14, 30, 15))
+        self.assertEquals(next, datetime(2010, 9, 13, 0, 5))
+
+
 class test_crontab_is_due(unittest.TestCase):
 
+    def setUp(self):
+        self.now = datetime.now()
+        self.next_minute = 60 - self.now.second - 1e-6 * self.now.microsecond
+
     def test_default_crontab_spec(self):
         c = crontab()
         self.assertEquals(c.minute, set(range(60)))
@@ -636,101 +690,102 @@ class test_crontab_is_due(unittest.TestCase):
         self.assertRaises(ValueError, crontab, day_of_week='12')
 
     def test_every_minute_execution_is_due(self):
-        last_ran = datetime.now() - timedelta(seconds=61)
+        last_ran = self.now - timedelta(seconds=61)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertAlmostEquals(remaining, self.next_minute)
 
     def test_every_minute_execution_is_not_due(self):
-        last_ran = datetime.now() - timedelta(seconds=30)
+        last_ran = self.now - timedelta(seconds=self.now.second)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertFalse(due)
-        self.assertEquals(remaining, 1)
+        self.assertAlmostEquals(remaining, self.next_minute)
 
     # 29th of May 2010 is a saturday
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 29, 10, 30))
     def test_execution_is_due_on_saturday(self):
-        last_ran = datetime.now() - timedelta(seconds=61)
+        last_ran = self.now - timedelta(seconds=61)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertAlmostEquals(remaining, self.next_minute)
 
     # 30th of May 2010 is a sunday
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 30, 10, 30))
     def test_execution_is_due_on_sunday(self):
-        last_ran = datetime.now() - timedelta(seconds=61)
+        last_ran = self.now - timedelta(seconds=61)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertAlmostEquals(remaining, self.next_minute)
 
     # 31st of May 2010 is a monday
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 31, 10, 30))
     def test_execution_is_due_on_monday(self):
-        last_ran = datetime.now() - timedelta(seconds=61)
+        last_ran = self.now - timedelta(seconds=61)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertAlmostEquals(remaining, self.next_minute)
 
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_every_hour_execution_is_due(self):
         due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 60*60)
 
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 29))
     def test_every_hour_execution_is_not_due(self):
-        due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 9, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 15))
     def test_first_quarter_execution_is_due(self):
         due, remaining = QuarterlyPeriodic().is_due(
                             datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 15*60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_second_quarter_execution_is_due(self):
         due, remaining = QuarterlyPeriodic().is_due(
                             datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 15*60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
     def test_first_quarter_execution_is_not_due(self):
         due, remaining = QuarterlyPeriodic().is_due(
-                            datetime(2010, 5, 10, 6, 30))
+                            datetime(2010, 5, 10, 10, 0))
         self.assertFalse(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 29))
     def test_second_quarter_execution_is_not_due(self):
         due, remaining = QuarterlyPeriodic().is_due(
-                            datetime(2010, 5, 10, 6, 30))
+                            datetime(2010, 5, 10, 10, 15))
         self.assertFalse(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 60)
 
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 7, 30))
     def test_daily_execution_is_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 24*60*60)
 
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_daily_execution_is_not_due(self):
-        due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 10, 6, 29))
+        due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 10, 7, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 21*60*60)
 
     @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 6, 7, 30))
     def test_weekly_execution_is_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 7, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 7*24*60*60)
 
     @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 7, 10, 30))
     def test_weekly_execution_is_not_due(self):
-        due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 6, 29))
+        due, remaining = WeeklyPeriodic().is_due(datetime(2010, 5, 6, 7, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 1)
+        self.assertEquals(remaining, 6*24*60*60 - 3*60*60)
+