Jelajahi Sumber

Refactored ScheduledTask

New syntax:

    class EveryMorningAt7_30(PeriodicTask):
        run_every = crontab(hour=7, minute=30)

This means the run_every value is now responsible for is_due calculation,
by default. But old subclasses of PeriodicTask that overrides the is_due
method should still work.
Ask Solem 15 tahun lalu
induk
melakukan
ef3233470a
3 mengubah file dengan 198 tambahan dan 162 penghapusan
  1. 2 2
      celery/task/__init__.py
  2. 146 120
      celery/task/base.py
  3. 50 40
      celery/tests/test_task.py

+ 2 - 2
celery/task/__init__.py

@@ -8,14 +8,14 @@ from billiard.serialization import pickle
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
-from celery.task.base import ScheduledTask
+from celery.task.base import crontab
 from celery.task.control import discard_all
 from celery.task.builtins import PingTask
 from celery.task.http import HttpDispatchTask
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
            "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask",
-           "ScheduledTask"]
+           "crontab"]
 
 
 def dmap(fun, args, timeout=None):

+ 146 - 120
celery/task/base.py

@@ -16,10 +16,6 @@ from celery.messaging import establish_connection as _establish_connection
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 
 
-def get_current_time():
-    return datetime.now()
-
-
 class TaskType(type):
     """Metaclass for tasks.
 
@@ -656,55 +652,12 @@ class TaskSet(object):
         return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
 
 
-class PeriodicTask(Task):
-    """A periodic task is a task that behaves like a :manpage:`cron` job.
-
-    Results of periodic tasks are not stored by default.
-
-    .. attribute:: run_every
-
-        *REQUIRED* Defines how often the task is run (its interval),
-        it can be either a :class:`datetime.timedelta` object or an
-        integer specifying the time in seconds.
-
-    .. attribute:: relative
-
-        If set to ``True``, run times are relative to the time when the
-        server was started. This was the previous behaviour, periodic tasks
-        are now scheduled by the clock.
-
-    :raises NotImplementedError: if the :attr:`run_every` attribute is
-        not defined.
-
-    Example
-
-        >>> from celery.task import tasks, PeriodicTask
-        >>> from datetime import timedelta
-        >>> class MyPeriodicTask(PeriodicTask):
-        ...     run_every = timedelta(seconds=30)
-        ...
-        ...     def run(self, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Running MyPeriodicTask")
-
-    """
-    abstract = True
-    ignore_result = True
-    type = "periodic"
+class schedule(object):
     relative = False
 
-    def __init__(self):
-        if not hasattr(self, "run_every"):
-            raise NotImplementedError(
-                    "Periodic tasks must have a run_every attribute")
-
-        # If run_every is a integer, convert it to timedelta seconds.
-        # Operate on the original class attribute so anyone accessing
-        # it directly gets the right value.
-        if isinstance(self.__class__.run_every, int):
-            self.__class__.run_every = timedelta(seconds=self.run_every)
-
-        super(PeriodicTask, self).__init__()
+    def __init__(self, run_every=None, relative=False):
+        self.run_every = run_every
+        self.relative = relative
 
     def remaining_estimate(self, last_run_at):
         """Returns when the periodic task should run next as a timedelta."""
@@ -713,39 +666,17 @@ class PeriodicTask(Task):
             next_run_at = self.delta_resolution(next_run_at, self.run_every)
         return next_run_at - datetime.now()
 
-    def timedelta_seconds(self, delta):
-        """Convert :class:`datetime.timedelta` to seconds.
-
-        Doesn't account for negative timedeltas.
-
-        """
-        return timedelta_seconds(delta)
-
     def is_due(self, last_run_at):
         """Returns tuple of two items ``(is_due, next_time_to_run)``,
         where next time to run is in seconds.
 
-        e.g.
-
-        * ``(True, 20)``, means the task should be run now, and the next
-            time to run is in 20 seconds.
-
-        * ``(False, 12)``, means the task should be run in 12 seconds.
-
-        You can override this to decide the interval at runtime,
-        but keep in mind the value of ``CELERYBEAT_MAX_LOOP_INTERVAL``, which
-        decides the maximum number of seconds celerybeat can sleep between
-        re-checking the periodic task intervals. So if you dynamically change
-        the next run at value, and the max interval is set to 5 minutes, it
-        will take 5 minutes for the change to take effect, so you may
-        consider lowering the value of ``CELERYBEAT_MAX_LOOP_INTERVAL`` if
-        responsiveness if of importance to you.
+        See :meth:`PeriodicTask.is_due` for more information.
 
         """
         rem_delta = self.remaining_estimate(last_run_at)
-        rem = self.timedelta_seconds(rem_delta)
+        rem = timedelta_seconds(rem_delta)
         if rem == 0:
-            return True, self.timedelta_seconds(self.run_every)
+            return True, timedelta_seconds(self.run_every)
         return False, rem
 
     def delta_resolution(self, dt, delta):
@@ -769,7 +700,7 @@ class PeriodicTask(Task):
             datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
 
         """
-        delta = self.timedelta_seconds(delta)
+        delta = timedelta_seconds(delta)
 
         resolutions = ((3, lambda x: x / 86400),
                        (4, lambda x: x / 3600),
@@ -782,14 +713,16 @@ class PeriodicTask(Task):
         return dt
 
 
-class ScheduledTask(PeriodicTask):
-    """A scheduled task is a task that adds more precise scheduling to the
-    features of a :class:`celery.task.base.PeriodicTask`.
+class crontab(schedule):
+    """A crontab can be used as the ``run_every`` value of a
+    :class:`PeriodicTask` to add cron-like scheduling.
+
+    Like a :manpage:`cron` job, you can specify units of time of when
+    you would like the task to execute. While not a full implementation
+    of cron's features, it should provide a fair degree of common scheduling
+    needs.
 
-    Like a :manpage:`cron` job, you can specify units of time of when you would
-    like the task to execute.  While not a full implementation of cron, it
-    should provide a fair degree of common scheduling needs.  You can specify
-    a minute, an hour, and/or a day of the week.
+    You can specify a minute, an hour, and/or a day of the week.
 
     .. attribute:: minute
 
@@ -803,59 +736,152 @@ class ScheduledTask(PeriodicTask):
 
     .. attribute:: day_of_week
 
-        An integer from 0-6, where Sunday = 0 and Saturday = 6, that represents
-        the day of week that execution should occur.
+        An integer from 0-6, where Sunday = 0 and Saturday = 6, that
+        represents the day of week that execution should occur.
+
+    """
+    def __init__(self, minute=None, hour=None, day_of_week=None,
+            nowfun=datetime.now):
+        self.hour = hour                  # (0 - 23)
+        self.minute = minute              # (0 - 59)
+        self.day_of_week = day_of_week    # (0 - 6) (Sunday=0)
+        self.nowfun = nowfun
+
+    def remaining_estimate(self, last_run_at):
+        # remaining_estimate controls the frequency of scheduler
+        # ticks. The scheduler needs to wake up every second in this case.
+        return 1
+
+    def is_due(self, last_run_at):
+        now = self.nowfun()
+        last = now - last_run_at
+        due, when = False, 1
+        if last.days > 0 or last.seconds > 60:
+            if self.day_of_week in (None, now.isoweekday()):
+                due, when = self._check_hour_minute(now)
+        return due, when
+
+    def _check_hour_minute(self, now):
+        due, when = False, 1
+        if self.hour is None and self.minute is None:
+            due, when = True, 1
+        if self.hour is None and self.minute == now.minute:
+            due, when = True, 1
+        if self.hour == now.hour and self.minute is None:
+            due, when = True, 1
+        if self.hour == now.hour and self.minute == now.minute:
+            due, when = True, 1
+        return due, when
+
+
+class PeriodicTask(Task):
+    """A periodic task is a task that behaves like a :manpage:`cron` job.
+
+    Results of periodic tasks are not stored by default.
+
+    .. attribute:: run_every
+
+        *REQUIRED* Defines how often the task is run (its interval),
+        it can be a :class:`datetime.timedelta` object, a :class:`crontab`
+        object or an integer specifying the time in seconds.
+
+    .. attribute:: relative
+
+        If set to ``True``, run times are relative to the time when the
+        server was started. This was the previous behaviour, periodic tasks
+        are now scheduled by the clock.
+
+    :raises NotImplementedError: if the :attr:`run_every` attribute is
+        not defined.
 
     Example
 
-        >>> from celery.task import ScheduledTask
-        >>> class EveryMondayMorningTask(ScheduledTask):
-        ...     hour = 7
-        ...     minute = 30
-        ...     day_of_week = 1
+        >>> from celery.task import tasks, PeriodicTask
+        >>> from datetime import timedelta
+        >>> class EveryThirtySecondsTask(PeriodicTask):
+        ...     run_every = timedelta(seconds=30)
+        ...
+        ...     def run(self, **kwargs):
+        ...         logger = self.get_logger(**kwargs)
+        ...         logger.info("Execute every 30 seconds")
+
+        >>> from celery.task import PeriodicTask, crontab
+        >>> class EveryMondayMorningTask(PeriodicTask):
+        ...     run_every = crontab(hour=7, minute=30, day_of_week=1)
+        ...
         ...     def run(self, **kwargs):
         ...         logger = self.get_logger(**kwargs)
         ...         logger.info("Execute every Monday at 7:30AM.")
 
-        >>> from celery.task import ScheduledTask
-        >>> class EveryMorningTask(ScheduledTask):
-        ...     hour = 7
-        ...     minute = 30
+        >>> class EveryMorningTask(PeriodicTask):
+        ...     run_every = crontab(hours=7, minute=30)
+        ...
         ...     def run(self, **kwargs):
         ...         logger = self.get_logger(**kwargs)
         ...         logger.info("Execute every day at 7:30AM.")
 
-        >>> from celery.task import ScheduledTask
-        >>> class EveryQuarterPastTheHourTask(ScheduledTask):
-        ...     minute = 15
+        >>> class EveryQuarterPastTheHourTask(PeriodicTask):
+        ...     run_every = crontab(minute=15)
+
         ...     def run(self, **kwargs):
         ...         logger = self.get_logger(**kwargs)
         ...         logger.info("Execute every 0:15 past the hour every day.")
 
     """
-    run_every = timedelta(seconds=1)
-    hour = None           # (0 - 23)
-    minute = None         # (0 - 59)
-    day_of_week = None    # (0 - 6) (Sunday=0)
     abstract = True
+    ignore_result = True
+    type = "periodic"
+    relative = False
 
-    def check_hour_minute(self, now):
-        (due, when) = (False, 1)
-        if self.hour is None and self.minute is None:
-            (due, when) = (True, 1)
-        if self.hour is None and self.minute == now.minute:
-            (due, when) = (True, 1)
-        if self.hour == now.hour and self.minute is None:
-            (due, when) = (True, 1)
-        if self.hour == now.hour and self.minute == now.minute:
-            (due, when) = (True, 1)
-        return (due, when)
+    def __init__(self):
+        if not hasattr(self, "run_every"):
+            raise NotImplementedError(
+                    "Periodic tasks must have a run_every attribute")
+
+        # If run_every is a integer, convert it to timedelta seconds.
+        # Operate on the original class attribute so anyone accessing
+        # it directly gets the right value.
+        if isinstance(self.__class__.run_every, int):
+            self.__class__.run_every = timedelta(seconds=self.run_every)
+
+        # Convert timedelta to instance of schedule.
+        if isinstance(self.__class__.run_every, timedelta):
+            self.__class__.run_every = schedule(self.__class__.run_every,
+                                                self.relative)
+
+        super(PeriodicTask, self).__init__()
+
+    def timedelta_seconds(self, delta):
+        """Convert :class:`datetime.timedelta` to seconds.
+
+        Doesn't account for negative timedeltas.
+
+        """
+        return timedelta_seconds(delta)
 
     def is_due(self, last_run_at):
-        n = get_current_time()
-        last = (n - last_run_at)
-        (due, when) = (False, 1)
-        if last.days > 0 or last.seconds > 60:
-            if self.day_of_week in (None, n.isoweekday()):
-                (due, when) = self.check_hour_minute(n)
-        return (due, when)
+        """Returns tuple of two items ``(is_due, next_time_to_run)``,
+        where next time to run is in seconds.
+
+        e.g.
+
+        * ``(True, 20)``, means the task should be run now, and the next
+            time to run is in 20 seconds.
+
+        * ``(False, 12)``, means the task should be run in 12 seconds.
+
+        You can override this to decide the interval at runtime,
+        but keep in mind the value of ``CELERYBEAT_MAX_LOOP_INTERVAL``, which
+        decides the maximum number of seconds celerybeat can sleep between
+        re-checking the periodic task intervals. So if you dynamically change
+        the next run at value, and the max interval is set to 5 minutes, it
+        will take 5 minutes for the change to take effect, so you may
+        consider lowering the value of ``CELERYBEAT_MAX_LOOP_INTERVAL`` if
+        responsiveness if of importance to you.
+
+        """
+        return self.run_every.is_due(last_run_at)
+
+    def remaining_estimate(self, last_run_at):
+        """Returns when the periodic task should run next as a timedelta."""
+        return self.run_every.remaining_estimate(last_run_at)

+ 50 - 40
celery/tests/test_task.py

@@ -1,7 +1,8 @@
 import unittest2 as unittest
 from StringIO import StringIO
 from datetime import datetime, timedelta
-from mock import patch
+
+from billiard.utils.functional import wraps
 
 from celery import task
 from celery import messaging
@@ -446,7 +447,7 @@ class TestPeriodicTask(unittest.TestCase):
             self.assertEqual(MyPeriodic().timedelta_seconds(delta), seconds)
 
     def test_delta_resolution(self):
-        D = MyPeriodic().delta_resolution
+        D = MyPeriodic().run_every.delta_resolution
 
         dt = datetime(2010, 3, 30, 11, 50, 58, 41065)
         deltamap = ((timedelta(days=2), datetime(2010, 3, 30, 0, 0)),
@@ -463,83 +464,92 @@ class TestPeriodicTask(unittest.TestCase):
 
     def test_is_due(self):
         p = MyPeriodic()
-        due, remaining = p.is_due(datetime.now() - p.run_every)
+        due, remaining = p.is_due(datetime.now() - p.run_every.run_every)
         self.assertTrue(due)
-        self.assertEqual(remaining, p.timedelta_seconds(p.run_every))
+        self.assertEqual(remaining,
+                         p.timedelta_seconds(p.run_every.run_every))
+
+
+class EveryMinutePeriodic(task.PeriodicTask):
+    run_every = task.crontab()
+
+
+class HourlyPeriodic(task.PeriodicTask):
+    run_every = task.crontab(minute=30)
+
+
+class DailyPeriodic(task.PeriodicTask):
+    run_every = task.crontab(hour=7, minute=30)
 
 
-class EveryMinutePeriodic(task.ScheduledTask):
-    pass
+class WeeklyPeriodic(task.PeriodicTask):
+    run_every = task.crontab(hour=7, minute=30, day_of_week=4)
 
 
-class HourlyPeriodic(task.ScheduledTask):
-    minute = 30
+def patch_crontab_nowfun(cls, retval):
 
+    def create_patcher(fun):
 
-class DailyPeriodic(task.ScheduledTask):
-    hour = 7
-    minute = 30
+        @wraps(fun)
+        def __inner(*args, **kwargs):
+            prev_nowfun = cls.run_every.nowfun
+            cls.run_every.nowfun = lambda: retval
+            try:
+                return fun(*args, **kwargs)
+            finally:
+                cls.run_every.nowfun = prev_nowfun
+
+        return __inner
 
+    return create_patcher
 
-class WeeklyPeriodic(task.ScheduledTask):
-    hour = 7
-    minute = 30
-    day_of_week = 4
 
+class test_crontab(unittest.TestCase):
 
-class TestScheduledTask(unittest.TestCase):
-    
     def test_every_minute_execution_is_due(self):
         last_ran = datetime.now() - timedelta(seconds=61)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertTrue(due)
         self.assertEquals(remaining, 1)
-        
+
     def test_every_minute_execution_is_not_due(self):
         last_ran = datetime.now() - timedelta(seconds=30)
         due, remaining = EveryMinutePeriodic().is_due(last_ran)
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
-    
-    @patch('celery.task.base.get_current_time')
-    def test_every_hour_execution_is_due(self, NowMock):
-        NowMock.return_value = datetime(2010, 5, 10, 10, 30)
+
+    @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 30))
+    def test_every_hour_execution_is_due(self):
         due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
         self.assertEquals(remaining, 1)
-    
-    @patch('celery.task.base.get_current_time')
-    def test_every_hour_execution_is_not_due(self, NowMock):
-        NowMock.return_value = datetime(2010, 5, 10, 10, 29)
-        due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 9, 30))
+
+    @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 30))
+    def test_every_hour_execution_is_not_due(self):
+        due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
 
-    @patch('celery.task.base.get_current_time')
-    def test_daily_execution_is_due(self, NowMock):
-        NowMock.return_value = datetime(2010, 5, 10, 7, 30)
+    @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 7, 30))
+    def test_daily_execution_is_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))
         self.assertTrue(due)
         self.assertEquals(remaining, 1)
 
-    @patch('celery.task.base.get_current_time')
-    def test_daily_execution_is_not_due(self, NowMock):
-        NowMock.return_value = datetime(2010, 5, 10, 10, 30)
+    @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 10, 30))
+    def test_daily_execution_is_not_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 10, 6, 29))
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
 
-    @patch('celery.task.base.get_current_time')
-    def test_weekly_execution_is_due(self, NowMock):
-        NowMock.return_value = datetime(2010, 5, 6, 7, 30)
+    @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 6, 7, 30))
+    def test_weekly_execution_is_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 7, 30))
         self.assertTrue(due)
         self.assertEquals(remaining, 1)
 
-    @patch('celery.task.base.get_current_time')
-    def test_weekly_execution_is_not_due(self, NowMock):
-        NowMock.return_value = datetime(2010, 5, 7, 10, 30)
+    @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 7, 10, 30))
+    def test_weekly_execution_is_not_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 6, 29))
         self.assertFalse(due)
         self.assertEquals(remaining, 1)
-