浏览代码

Add support for more complex crontab patterns.

This addition is twofold:

1. Added crontab_parser class, for parsing complex crontab patterns.

   Any expression of the form 'groups' (see BNF grammar below) is accepted and
   expanded to a set of numbers.  These numbers represent the units of time
   that the crontab needs to run on.

    digit   :: '0'..'9'
    star    :: '*'
    number  :: digit+
    steps   :: number
    range   :: number ( '-' number ) ?
    numspec :: star | range
    expr    :: numspec ( '/' steps ) ?
    groups  :: expr ( ',' expr ) *

   The parser (written using pyparsing) is a general purpose one, useful for
   parsing hours, minutes and day_of_week expressions.  Example usage:

    minutes = crontab_parser(60).parse("*/15")  # yields [0,15,30,45]
    hours = crontab_parser(24).parse("*/4")     # yields [0,4,8,12,16,20]
    day_of_week = crontab_parser(7).parse("*")  # yields [0,1,2,3,4,5,6]

2. The crontab class's is_due method is rewritten (simplified) to test the
   current datetime's minute, hour and isoweekday values for inclusion in
   the sets generated by the crontab parser.

   Support for entering day_of_week values like 'mon', 'monday', etc. is still
   supported via a dirty (but at least small and isolated) hack to have all
   unit tests lights turn green. Structural support will be included in the
   crontab parser in a future commit.
Vincent Driessen 15 年之前
父节点
当前提交
0ea73d8260
共有 3 个文件被更改,包括 258 次插入19 次删除
  1. 146 17
      celery/task/schedules.py
  2. 111 2
      celery/tests/test_task.py
  3. 1 0
      contrib/requirements/default.txt

+ 146 - 17
celery/task/schedules.py

@@ -1,4 +1,6 @@
 from datetime import datetime
 from datetime import datetime
+from collections import Iterable
+from pyparsing import Word, Literal, ZeroOrMore, Optional, Group, StringEnd
 
 
 from celery.utils.timeutils import timedelta_seconds, weekday, remaining
 from celery.utils.timeutils import timedelta_seconds, weekday, remaining
 
 
@@ -28,6 +30,86 @@ class schedule(object):
         return False, rem
         return False, rem
 
 
 
 
+class crontab_parser(object):
+    """Parser for crontab expressions. Any expression of the form 'groups' (see
+    BNF grammar below) is accepted and expanded to a set of numbers.  These
+    numbers represent the units of time that the crontab needs to run on.
+
+        digit   :: '0'..'9'
+        number  :: digit+
+        steps   :: number
+        range   :: number ( '-' number ) ?
+        numspec :: '*' | range
+        expr    :: numspec ( '/' steps ) ?
+        groups  :: expr ( ',' expr ) *
+
+    The parser is a general purpose one, useful for parsing hours, minutes and
+    day_of_week expressions.  Example usage::
+
+        minutes = crontab_parser(60).parse("*/15")  # yields [0,15,30,45]
+        hours = crontab_parser(24).parse("*/4")     # yields [0,4,8,12,16,20]
+        day_of_week = crontab_parser(7).parse("*")  # yields [0,1,2,3,4,5,6]
+
+    """
+
+    def __init__(self, max_=60):
+        # define the grammar structure
+        digits = "0123456789"
+        star = Literal('*')
+        number = Word(digits)
+        steps = number
+        range_ = number + Optional(Literal('-') + number)
+        numspec = star | range_
+        expr = Group(numspec) + Optional(Literal('/') + steps)
+        extra_groups = ZeroOrMore(Literal(',') + expr)
+        groups = expr + extra_groups + StringEnd()
+
+        # define parse actions
+        star.setParseAction(self._expand_star)
+        number.setParseAction(self._expand_number)
+        range_.setParseAction(self._expand_range)
+        expr.setParseAction(self._filter_steps)
+        extra_groups.setParseAction(self._ignore_comma)
+        groups.setParseAction(self._join_to_set)
+
+        self.max_ = max_
+        self.parser = groups
+
+    @staticmethod
+    def _expand_number(toks):
+        return [int(toks[0])]
+
+    @staticmethod
+    def _expand_range(toks):
+        if len(toks) > 1:
+            return range(toks[0], int(toks[2])+1)
+        else:
+            return toks[0]
+
+    def _expand_star(self, toks):
+        return range(self.max_)
+
+    @staticmethod
+    def _filter_steps(toks):
+        numbers = toks[0]
+        if len(toks) > 1:
+            steps = toks[2]
+            return [n for n in numbers if n % steps == 0]
+        else:
+            return numbers
+
+    @staticmethod
+    def _ignore_comma(toks):
+        return filter(lambda x: x != ',', toks)
+
+    @staticmethod
+    def _join_to_set(toks):
+        return set(toks.asList())
+
+    def parse(self, cronspec):
+        return self.parser.parseString(cronspec).pop()
+
+
 class crontab(schedule):
 class crontab(schedule):
     """A crontab can be used as the ``run_every`` value of a
     """A crontab can be used as the ``run_every`` value of a
     :class:`PeriodicTask` to add cron-like scheduling.
     :class:`PeriodicTask` to add cron-like scheduling.
@@ -56,15 +138,69 @@ class crontab(schedule):
 
 
     """
     """
 
 
-    def __init__(self, minute=None, hour=None, day_of_week=None,
+    @staticmethod
+    def _expand_cronspec(cronspec, max_):
+        """Takes the given cronspec argument in one of the forms::
+
+            int         (like 7)
+            basestring  (like '3-5,*/15', '*', or 'monday')
+            set         (like set([0,15,30,45]))
+            list        (like [8-17])
+
+        And convert it to an (expanded) set representing all time unit
+        values on which the crontab triggers.  Only in case of the base
+        type being 'basestring', parsing occurs.  (It is fast and
+        happens only once for each crontab instance, so there is no
+        significant performance overhead involved.)
+
+        For the other base types, merely Python type conversions happen.
+
+        The argument `max_` is needed to determine the expansion of '*'.
+
+        """
+        if isinstance(cronspec, int):
+            result = set([cronspec])
+        elif isinstance(cronspec, basestring):
+            result = crontab_parser(max_).parse(cronspec)
+        elif isinstance(cronspec, set):
+            result = cronspec
+        elif isinstance(cronspec, Iterable):
+            result = set(cronspec)
+        else:
+            raise TypeError("Argument cronspec needs to be of any of the " + \
+                    "following types: int, basestring, set, or Iterable. " + \
+                    "'%s' was given." % type(cronspec))
+
+        # assure the result does not exceed the max
+        for number in result:
+            if number >= max_:
+                raise ValueError("Invalid crontab pattern. Valid " + \
+                "range is 0-%d. '%d' was found." % (max_, number))
+
+        return result
+
+    def __init__(self, minute='*', hour='*', day_of_week='*',
             nowfun=datetime.now):
             nowfun=datetime.now):
-        self.hour = hour                  # (0 - 23)
-        self.minute = minute              # (0 - 59)
-        self.day_of_week = day_of_week    # (0 - 6) (Sunday=0)
-        self.nowfun = nowfun
 
 
-        if isinstance(self.day_of_week, basestring):
-            self.day_of_week = weekday(self.day_of_week)
+        # ---------------------------------------------------------------------
+        # TODO: Isolated HACK
+        aliases = {'sun': '0', 'sunday': '0',  \
+                    'mon': '1', 'monday': '1',  \
+                    'tue': '2', 'tuesday': '2', \
+                    'wed': '3', 'wednesday': '3', \
+                    'thu': '4', 'thursday': '4', \
+                    'fri': '5', 'friday': '5', \
+                    'sat': '6', 'saturday': '6'}
+        for key in sorted(aliases, lambda x, y: cmp(len(x), len(y)),
+                      reverse=True):
+            val = aliases[key]
+            day_of_week = day_of_week.replace(key,val)
+        # ---------------------------------------------------------------------
+
+        self.hour = self._expand_cronspec(hour, 24)
+        self.minute = self._expand_cronspec(minute, 60)
+        self.day_of_week = self._expand_cronspec(day_of_week, 7)
+        self.nowfun = nowfun
 
 
     def remaining_estimate(self, last_run_at):
     def remaining_estimate(self, last_run_at):
         # remaining_estimate controls the frequency of scheduler
         # remaining_estimate controls the frequency of scheduler
@@ -76,18 +212,11 @@ class crontab(schedule):
         last = now - last_run_at
         last = now - last_run_at
         due, when = False, 1
         due, when = False, 1
         if last.days > 0 or last.seconds > 60:
         if last.days > 0 or last.seconds > 60:
-            if self.day_of_week in (None, now.isoweekday()):
+            if now.isoweekday() in self.day_of_week:
                 due, when = self._check_hour_minute(now)
                 due, when = self._check_hour_minute(now)
         return due, when
         return due, when
 
 
     def _check_hour_minute(self, now):
     def _check_hour_minute(self, now):
-        due, when = False, 1
-        if self.hour is None and self.minute is None:
-            due, when = True, 1
-        if self.hour is None and self.minute == now.minute:
-            due, when = True, 1
-        if self.hour == now.hour and self.minute is None:
-            due, when = True, 1
-        if self.hour == now.hour and self.minute == now.minute:
-            due, when = True, 1
+        due = now.hour in self.hour and now.minute in self.minute
+        when = 1
         return due, when
         return due, when

+ 111 - 2
celery/tests/test_task.py

@@ -2,12 +2,14 @@ import unittest2 as unittest
 from StringIO import StringIO
 from StringIO import StringIO
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 
 
+from pyparsing import ParseException
+
 from billiard.utils.functional import wraps
 from billiard.utils.functional import wraps
 
 
 from celery import conf
 from celery import conf
 from celery import task
 from celery import task
 from celery import messaging
 from celery import messaging
-from celery.task.schedules import crontab
+from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from celery.result import EagerResult
 from celery.result import EagerResult
@@ -487,6 +489,10 @@ class EveryMinutePeriodic(task.PeriodicTask):
     run_every = crontab()
     run_every = crontab()
 
 
 
 
+class QuarterlyPeriodic(task.PeriodicTask):
+    run_every = crontab(minute="*/15")
+
+
 class HourlyPeriodic(task.PeriodicTask):
 class HourlyPeriodic(task.PeriodicTask):
     run_every = crontab(minute=30)
     run_every = crontab(minute=30)
 
 
@@ -517,7 +523,86 @@ def patch_crontab_nowfun(cls, retval):
     return create_patcher
     return create_patcher
 
 
 
 
-class test_crontab(unittest.TestCase):
+class test_crontab_parser(unittest.TestCase):
+
+    def test_parse_star(self):
+        self.assertEquals(crontab_parser(24).parse('*'), set(range(24)))
+        self.assertEquals(crontab_parser(60).parse('*'), set(range(60)))
+        self.assertEquals(crontab_parser(7).parse('*'), set(range(7)))
+
+    def test_parse_range(self):
+        self.assertEquals(crontab_parser(60).parse('1-10'), set(range(1,10+1)))
+        self.assertEquals(crontab_parser(24).parse('0-20'), set(range(0,20+1)))
+        self.assertEquals(crontab_parser().parse('2-10'), set(range(2,10+1)))
+
+    def test_parse_groups(self):
+        self.assertEquals(crontab_parser().parse('1,2,3,4'), set([1,2,3,4]))
+        self.assertEquals(crontab_parser().parse('0,15,30,45'), set([0,15,30,45]))
+
+    def test_parse_steps(self):
+        self.assertEquals(crontab_parser(8).parse('*/2'), set([0,2,4,6]))
+        self.assertEquals(crontab_parser().parse('*/2'), set([ i*2 for i in xrange(30) ]))
+        self.assertEquals(crontab_parser().parse('*/3'), set([ i*3 for i in xrange(20) ]))
+
+    def test_parse_composite(self):
+        self.assertEquals(crontab_parser(8).parse('*/2'), set([0,2,4,6]))
+        self.assertEquals(crontab_parser().parse('2-9/5'), set([5]))
+        self.assertEquals(crontab_parser().parse('2-10/5'), set([5,10]))
+        self.assertEquals(crontab_parser().parse('2-11/5,3'), set([3,5,10]))
+        self.assertEquals(crontab_parser().parse('2-4/3,*/5,0-21/4'), set([0,3,4,5,8,10,12,15,16,20,25,30,35,40,45,50,55]))
+
+    def test_parse_errors_on_empty_string(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '')
+
+    def test_parse_errors_on_empty_group(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '1,,2')
+
+    def test_parse_errors_on_empty_steps(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '*/')
+
+    def test_parse_errors_on_negative_number(self):
+        self.assertRaises(ParseException, crontab_parser(60).parse, '-20')
+
+
+class test_crontab_is_due(unittest.TestCase):
+
+    def test_default_crontab_spec(self):
+        c = crontab()
+        self.assertEquals(c.minute, set(range(60)))
+        self.assertEquals(c.hour, set(range(24)))
+        self.assertEquals(c.day_of_week, set(range(7)))
+
+    def test_simple_crontab_spec(self):
+        c = crontab(minute=30)
+        self.assertEquals(c.minute, set([30]))
+        self.assertEquals(c.hour, set(range(24)))
+        self.assertEquals(c.day_of_week, set(range(7)))
+
+    def test_crontab_spec_minute_formats(self):
+        c = crontab(minute=30)
+        self.assertEquals(c.minute, set([30]))
+        c = crontab(minute='30')
+        self.assertEquals(c.minute, set([30]))
+        c = crontab(minute=(30,40,50))
+        self.assertEquals(c.minute, set([30,40,50]))
+        c = crontab(minute=set([30,40,50]))
+        self.assertEquals(c.minute, set([30,40,50]))
+
+    def test_crontab_spec_invalid_minute(self):
+        self.assertRaises(ValueError, crontab, minute=60)
+        self.assertRaises(ValueError, crontab, minute='0-100')
+
+    def test_crontab_spec_hour_formats(self):
+        c = crontab(hour=6)
+        self.assertEquals(c.hour, set([6]))
+        c = crontab(hour='5')
+        self.assertEquals(c.hour, set([5]))
+        c = crontab(hour=(4,8,12))
+        self.assertEquals(c.hour, set([4,8,12]))
+
+    def test_crontab_spec_invalid_hour(self):
+        self.assertRaises(ValueError, crontab, hour=24)
+        self.assertRaises(ValueError, crontab, hour='0-30')
 
 
     def test_every_minute_execution_is_due(self):
     def test_every_minute_execution_is_due(self):
         last_ran = datetime.now() - timedelta(seconds=61)
         last_ran = datetime.now() - timedelta(seconds=61)
@@ -543,6 +628,30 @@ class test_crontab(unittest.TestCase):
         self.assertFalse(due)
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
         self.assertEquals(remaining, 1)
 
 
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 15))
+    def test_first_quarter_execution_is_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
+    def test_second_quarter_execution_is_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
+    def test_first_quarter_execution_is_not_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+
+    @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 29))
+    def test_second_quarter_execution_is_not_due(self):
+        due, remaining = QuarterlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 7, 30))
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 7, 30))
     def test_daily_execution_is_due(self):
     def test_daily_execution_is_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))

+ 1 - 0
contrib/requirements/default.txt

@@ -5,3 +5,4 @@ anyjson
 carrot>=0.10.4
 carrot>=0.10.4
 django-picklefield
 django-picklefield
 billiard>=0.3.0
 billiard>=0.3.0
+pyparsing