Selaa lähdekoodia

Merge branch 'feature/advanced-crontab'

Vincent Driessen 15 vuotta sitten
vanhempi
commit
06321d7ea9
3 muutettua tiedostoa jossa 284 lisäystä ja 29 poistoa
  1. 158 27
      celery/task/schedules.py
  2. 125 2
      celery/tests/test_task.py
  3. 1 0
      contrib/requirements/default.txt

+ 158 - 27
celery/task/schedules.py

@@ -1,4 +1,6 @@
 from datetime import datetime
+from collections import Iterable
+from pyparsing import Word, Literal, ZeroOrMore, Optional, Group, StringEnd, alphas
 
 from celery.utils.timeutils import timedelta_seconds, weekday, remaining
 
@@ -28,44 +30,180 @@ class schedule(object):
         return False, rem
 
 
+class crontab_parser(object):
+    """Parser for crontab expressions. Any expression of the form 'groups' (see
+    BNF grammar below) is accepted and expanded to a set of numbers.  These
+    numbers represent the units of time that the crontab needs to run on.
+
+        digit   :: '0'..'9'
+        dow     :: 'a'..'z'
+        number  :: digit+ | dow+
+        steps   :: number
+        range   :: number ( '-' number ) ?
+        numspec :: '*' | range
+        expr    :: numspec ( '/' steps ) ?
+        groups  :: expr ( ',' expr ) *
+
+    The parser is a general purpose one, useful for parsing hours, minutes and
+    day_of_week expressions.  Example usage::
+
+        minutes = crontab_parser(60).parse("*/15")  # yields [0,15,30,45]
+        hours = crontab_parser(24).parse("*/4")     # yields [0,4,8,12,16,20]
+        day_of_week = crontab_parser(7).parse("*")  # yields [0,1,2,3,4,5,6]
+
+    """
+
+    def __init__(self, max_=60):
+        # define the grammar structure
+        digits = "0123456789"
+        star = Literal('*')
+        number = Word(digits) | Word(alphas)
+        steps = number
+        range_ = number + Optional(Literal('-') + number)
+        numspec = star | range_
+        expr = Group(numspec) + Optional(Literal('/') + steps)
+        extra_groups = ZeroOrMore(Literal(',') + expr)
+        groups = expr + extra_groups + StringEnd()
+
+        # define parse actions
+        star.setParseAction(self._expand_star)
+        number.setParseAction(self._expand_number)
+        range_.setParseAction(self._expand_range)
+        expr.setParseAction(self._filter_steps)
+        extra_groups.setParseAction(self._ignore_comma)
+        groups.setParseAction(self._join_to_set)
+
+        self.max_ = max_
+        self.parser = groups
+
+    @staticmethod
+    def _expand_number(toks):
+        try:
+            i = int(toks[0])
+        except ValueError:
+            i = weekday(toks[0])
+        return [i]
+
+    @staticmethod
+    def _expand_range(toks):
+        if len(toks) > 1:
+            return range(toks[0], int(toks[2])+1)
+        else:
+            return toks[0]
+
+    def _expand_star(self, toks):
+        return range(self.max_)
+
+    @staticmethod
+    def _filter_steps(toks):
+        numbers = toks[0]
+        if len(toks) > 1:
+            steps = toks[2]
+            return [n for n in numbers if n % steps == 0]
+        else:
+            return numbers
+
+    @staticmethod
+    def _ignore_comma(toks):
+        return filter(lambda x: x != ',', toks)
+
+    @staticmethod
+    def _join_to_set(toks):
+        return set(toks.asList())
+
+    def parse(self, cronspec):
+        return self.parser.parseString(cronspec).pop()
+
+
 class crontab(schedule):
     """A crontab can be used as the ``run_every`` value of a
     :class:`PeriodicTask` to add cron-like scheduling.
 
     Like a :manpage:`cron` job, you can specify units of time of when
-    you would like the task to execute. While not a full implementation
-    of cron's features, it should provide a fair degree of common scheduling
-    needs.
+    you would like the task to execute. It is a reasonably complete
+    implementation of cron's features, so it should provide a fair
+    degree of scheduling needs.
 
-    You can specify a minute, an hour, and/or a day of the week.
+    You can specify a minute, an hour, and/or a day of the week in any
+    of the following formats:
 
     .. attribute:: minute
 
-        An integer from 0-59 that represents the minute of an hour of when
-        execution should occur.
+        - A (list of) integers from 0-59 that represent the minutes of
+          an hour of when execution should occur; or
+        - A string representing a crontab pattern.  This may get pretty
+          advanced, like `minute="*/15"` (for every quarter) or
+          `minute="1,13,30-45,50-59/2"`.
 
     .. attribute:: hour
 
-        An integer from 0-23 that represents the hour of a day of when
-        execution should occur.
+        - A (list of) integers from 0-23 that represent the hours of
+          a day of when execution should occur; or
+        - A string representing a crontab pattern.  This may get pretty
+          advanced, like `hour="*/3"` (for every three hours) or
+          `hour="0,8-17/2"` (at midnight, and every two hours during
+          office hours).
 
     .. attribute:: day_of_week
 
-        An integer from 0-6, where Sunday = 0 and Saturday = 6, that
-        represents the day of week that execution should occur.
+        - A (list of) integers from 0-6, where Sunday = 0 and Saturday =
+          6, that represent the days of a week that execution should
+          occur.
+        - A string representing a crontab pattern.  This may get pretty
+          advanced, like `day_of_week="mon-fri"` (for weekdays only).
+          (Beware that `day_of_week="*/2"` does not literally mean
+          "every two days", but "every day that is divisible by two"!)
 
     """
 
-    def __init__(self, minute=None, hour=None, day_of_week=None,
+    @staticmethod
+    def _expand_cronspec(cronspec, max_):
+        """Takes the given cronspec argument in one of the forms::
+
+            int         (like 7)
+            basestring  (like '3-5,*/15', '*', or 'monday')
+            set         (like set([0,15,30,45]))
+            list        (like [8-17])
+
+        And convert it to an (expanded) set representing all time unit
+        values on which the crontab triggers.  Only in case of the base
+        type being 'basestring', parsing occurs.  (It is fast and
+        happens only once for each crontab instance, so there is no
+        significant performance overhead involved.)
+
+        For the other base types, merely Python type conversions happen.
+
+        The argument `max_` is needed to determine the expansion of '*'.
+
+        """
+        if isinstance(cronspec, int):
+            result = set([cronspec])
+        elif isinstance(cronspec, basestring):
+            result = crontab_parser(max_).parse(cronspec)
+        elif isinstance(cronspec, set):
+            result = cronspec
+        elif isinstance(cronspec, Iterable):
+            result = set(cronspec)
+        else:
+            raise TypeError("Argument cronspec needs to be of any of the " + \
+                    "following types: int, basestring, set, or Iterable. " + \
+                    "'%s' was given." % type(cronspec))
+
+        # assure the result does not exceed the max
+        for number in result:
+            if number >= max_:
+                raise ValueError("Invalid crontab pattern. Valid " + \
+                "range is 0-%d. '%d' was found." % (max_, number))
+
+        return result
+
+    def __init__(self, minute='*', hour='*', day_of_week='*',
             nowfun=datetime.now):
-        self.hour = hour                  # (0 - 23)
-        self.minute = minute              # (0 - 59)
-        self.day_of_week = day_of_week    # (0 - 6) (Sunday=0)
+        self.hour = self._expand_cronspec(hour, 24)
+        self.minute = self._expand_cronspec(minute, 60)
+        self.day_of_week = self._expand_cronspec(day_of_week, 7)
         self.nowfun = nowfun
 
-        if isinstance(self.day_of_week, basestring):
-            self.day_of_week = weekday(self.day_of_week)
-
     def remaining_estimate(self, last_run_at):
         # remaining_estimate controls the frequency of scheduler
         # ticks. The scheduler needs to wake up every second in this case.
@@ -76,18 +214,11 @@ class crontab(schedule):
         last = now - last_run_at
         due, when = False, 1
         if last.days > 0 or last.seconds > 60:
-            if self.day_of_week in (None, now.isoweekday()):
+            if now.isoweekday() in self.day_of_week:
                 due, when = self._check_hour_minute(now)
         return due, when
 
     def _check_hour_minute(self, now):
-        due, when = False, 1
-        if self.hour is None and self.minute is None:
-            due, when = True, 1
-        if self.hour is None and self.minute == now.minute:
-            due, when = True, 1
-        if self.hour == now.hour and self.minute is None:
-            due, when = True, 1
-        if self.hour == now.hour and self.minute == now.minute:
-            due, when = True, 1
+        due = now.hour in self.hour and now.minute in self.minute
+        when = 1
         return due, when

+ 125 - 2
celery/tests/test_task.py

@@ -2,12 +2,14 @@ import unittest2 as unittest
 from StringIO import StringIO
 from datetime import datetime, timedelta
 
+from pyparsing import ParseException
+
 from billiard.utils.functional import wraps
 
 from celery import conf
 from celery import task
 from celery import messaging
-from celery.task.schedules import crontab
+from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.result import EagerResult
@@ -487,6 +489,10 @@ class EveryMinutePeriodic(task.PeriodicTask):
     run_every = crontab()
 
 
+class QuarterlyPeriodic(task.PeriodicTask):
+    run_every = crontab(minute="*/15")
+
+
 class HourlyPeriodic(task.PeriodicTask):
     run_every = crontab(minute=30)
 
@@ -517,7 +523,100 @@ def patch_crontab_nowfun(cls, retval):
     return create_patcher
 
 
-class test_crontab(unittest.TestCase):
+class test_crontab_parser(unittest.TestCase):
+
+    def test_parse_star(self):
+        self.assertEquals(crontab_parser(24).parse('*'), set(range(24)))
+        self.assertEquals(crontab_parser(60).parse('*'), set(range(60)))
+        self.assertEquals(crontab_parser(7).parse('*'), set(range(7)))
+
+    def test_parse_range(self):
+        self.assertEquals(crontab_parser(60).parse('1-10'), set(range(1,10+1)))
+        self.assertEquals(crontab_parser(24).parse('0-20'), set(range(0,20+1)))
+        self.assertEquals(crontab_parser().parse('2-10'), set(range(2,10+1)))
+
+    def test_parse_groups(self):
+        self.assertEquals(crontab_parser().parse('1,2,3,4'), set([1,2,3,4]))
+        self.assertEquals(crontab_parser().parse('0,15,30,45'), set([0,15,30,45]))
+
+    def test_parse_steps(self):
+        self.assertEquals(crontab_parser(8).parse('*/2'), set([0,2,4,6]))
+        self.assertEquals(crontab_parser().parse('*/2'), set([ i*2 for i in xrange(30) ]))
+        self.assertEquals(crontab_parser().parse('*/3'), set([ i*3 for i in xrange(20) ]))
+
+    def test_parse_composite(self):
+        self.assertEquals(crontab_parser(8).parse('*/2'), set([0,2,4,6]))
+        self.assertEquals(crontab_parser().parse('2-9/5'), set([5]))
+        self.assertEquals(crontab_parser().parse('2-10/5'), set([5,10]))
+        self.assertEquals(crontab_parser().parse('2-11/5,3'), set([3,5,10]))
+        self.assertEquals(crontab_parser().parse('2-4/3,*/5,0-21/4'), set([0,3,4,5,8,10,12,15,16,20,25,30,35,40,45,50,55]))
+
+    def test_parse_errors_on_empty_string(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '')
+
+    def test_parse_errors_on_empty_group(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '1,,2')
+
+    def test_parse_errors_on_empty_steps(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '*/')
+
+    def test_parse_errors_on_negative_number(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '-20')
+
+
+class test_crontab_is_due(unittest.TestCase):
+
+    def test_default_crontab_spec(self):
+        c = crontab()
+        self.assertEquals(c.minute, set(range(60)))
+        self.assertEquals(c.hour, set(range(24)))
+        self.assertEquals(c.day_of_week, set(range(7)))
+
+    def test_simple_crontab_spec(self):
+        c = crontab(minute=30)
+        self.assertEquals(c.minute, set([30]))
+        self.assertEquals(c.hour, set(range(24)))
+        self.assertEquals(c.day_of_week, set(range(7)))
+
+    def test_crontab_spec_minute_formats(self):
+        c = crontab(minute=30)
+        self.assertEquals(c.minute, set([30]))
+        c = crontab(minute='30')
+        self.assertEquals(c.minute, set([30]))
+        c = crontab(minute=(30,40,50))
+        self.assertEquals(c.minute, set([30,40,50]))
+        c = crontab(minute=set([30,40,50]))
+        self.assertEquals(c.minute, set([30,40,50]))
+
+    def test_crontab_spec_invalid_minute(self):
+        self.assertRaises(ValueError, crontab, minute=60)
+        self.assertRaises(ValueError, crontab, minute='0-100')
+
+    def test_crontab_spec_hour_formats(self):
+        c = crontab(hour=6)
+        self.assertEquals(c.hour, set([6]))
+        c = crontab(hour='5')
+        self.assertEquals(c.hour, set([5]))
+        c = crontab(hour=(4,8,12))
+        self.assertEquals(c.hour, set([4,8,12]))
+
+    def test_crontab_spec_invalid_hour(self):
+        self.assertRaises(ValueError, crontab, hour=24)
+        self.assertRaises(ValueError, crontab, hour='0-30')
+
+    def test_crontab_spec_dow_formats(self):
+        c = crontab(day_of_week=5)
+        self.assertEquals(c.day_of_week, set([5]))
+        c = crontab(day_of_week='5')
+        self.assertEquals(c.day_of_week, set([5]))
+        c = crontab(day_of_week='fri')
+        self.assertEquals(c.day_of_week, set([5]))
+        c = crontab(day_of_week='tuesday,sunday,fri')
+        self.assertEquals(c.day_of_week, set([0,2,5]))
+        c = crontab(day_of_week='mon-fri')
+        self.assertEquals(c.day_of_week, set([1,2,3,4,5]))
+        c = crontab(day_of_week='*/2')
+        self.assertEquals(c.day_of_week, set([0,2,4,6]))
 
     def test_every_minute_execution_is_due(self):
         last_ran = datetime.now() - timedelta(seconds=61)
@@ -543,6 +642,30 @@ class test_crontab(unittest.TestCase):
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
 
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 15))
+    def test_first_quarter_execution_is_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
+    def test_second_quarter_execution_is_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
+    def test_first_quarter_execution_is_not_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 29))
+    def test_second_quarter_execution_is_not_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 7, 30))
     def test_daily_execution_is_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))

+ 1 - 0
contrib/requirements/default.txt

@@ -5,3 +5,4 @@ anyjson
 carrot>=0.10.4
 django-picklefield
 billiard>=0.3.0
+pyparsing