Browse Source

Moved schedule + crontab from celery.task.base -> celery.task.schedules

Ask Solem 15 years ago
parent
commit
a332c83402
4 changed files with 133 additions and 7 deletions
  1. 1 3
      celery/task/__init__.py
  2. 93 0
      celery/task/schedules.py
  3. 5 4
      celery/tests/test_task.py
  4. 34 0
      celery/utils/timeutils.py

+ 1 - 3
celery/task/__init__.py

@@ -8,14 +8,12 @@ from billiard.serialization import pickle
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
-from celery.task.base import crontab
 from celery.task.control import discard_all
 from celery.task.builtins import PingTask
 from celery.task.http import HttpDispatchTask
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
-           "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask",
-           "crontab"]
+           "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask"]
 
 
 def dmap(fun, args, timeout=None):

+ 93 - 0
celery/task/schedules.py

@@ -0,0 +1,93 @@
+from datetime import datetime
+
+from celery.utils.timeutils import timedelta_seconds, weekday, remaining
+
+
+class schedule(object):
+    relative = False
+
+    def __init__(self, run_every=None, relative=False):
+        self.run_every = run_every
+        self.relative = relative
+
+    def remaining_estimate(self, last_run_at):
+        """Returns when the periodic task should run next as a timedelta."""
+        return remaining(last_run_at, self.run_every, relative=self.relative)
+
+    def is_due(self, last_run_at):
+        """Returns tuple of two items ``(is_due, next_time_to_run)``,
+        where next time to run is in seconds.
+
+        See :meth:`celery.task.base.PeriodicTask.is_due` for more information.
+
+        """
+        rem_delta = self.remaining_estimate(last_run_at)
+        rem = timedelta_seconds(rem_delta)
+        if rem == 0:
+            return True, timedelta_seconds(self.run_every)
+        return False, rem
+
+
+class crontab(schedule):
+    """A crontab can be used as the ``run_every`` value of a
+    :class:`PeriodicTask` to add cron-like scheduling.
+
+    Like a :manpage:`cron` job, you can specify units of time of when
+    you would like the task to execute. While not a full implementation
+    of cron's features, it should provide a fair degree of common scheduling
+    needs.
+
+    You can specify a minute, an hour, and/or a day of the week.
+
+    .. attribute:: minute
+
+        An integer from 0-59 that represents the minute of an hour of when
+        execution should occur.
+
+    .. attribute:: hour
+
+        An integer from 0-23 that represents the hour of a day of when
+        execution should occur.
+
+    .. attribute:: day_of_week
+
+        An integer from 0-6, where Sunday = 0 and Saturday = 6, that
+        represents the day of week that execution should occur.
+
+    """
+
+    def __init__(self, minute=None, hour=None, day_of_week=None,
+            nowfun=datetime.now):
+        self.hour = hour                  # (0 - 23)
+        self.minute = minute              # (0 - 59)
+        self.day_of_week = day_of_week    # (0 - 6) (Sunday=0)
+        self.nowfun = nowfun
+
+        if isinstance(self.day_of_week, basestring):
+            self.day_of_week = weekday(self.day_of_week)
+
+    def remaining_estimate(self, last_run_at):
+        # remaining_estimate controls the frequency of scheduler
+        # ticks. The scheduler needs to wake up every second in this case.
+        return 1
+
+    def is_due(self, last_run_at):
+        now = self.nowfun()
+        last = now - last_run_at
+        due, when = False, 1
+        if last.days > 0 or last.seconds > 60:
+            if self.day_of_week in (None, now.isoweekday()):
+                due, when = self._check_hour_minute(now)
+        return due, when
+
+    def _check_hour_minute(self, now):
+        due, when = False, 1
+        if self.hour is None and self.minute is None:
+            due, when = True, 1
+        if self.hour is None and self.minute == now.minute:
+            due, when = True, 1
+        if self.hour == now.hour and self.minute is None:
+            due, when = True, 1
+        if self.hour == now.hour and self.minute == now.minute:
+            due, when = True, 1
+        return due, when

+ 5 - 4
celery/tests/test_task.py

@@ -6,6 +6,7 @@ from billiard.utils.functional import wraps
 
 from celery import task
 from celery import messaging
+from celery.task.schedules import crontab
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.result import EagerResult
@@ -472,19 +473,19 @@ class TestPeriodicTask(unittest.TestCase):
 
 
 class EveryMinutePeriodic(task.PeriodicTask):
-    run_every = task.crontab()
+    run_every = crontab()
 
 
 class HourlyPeriodic(task.PeriodicTask):
-    run_every = task.crontab(minute=30)
+    run_every = crontab(minute=30)
 
 
 class DailyPeriodic(task.PeriodicTask):
-    run_every = task.crontab(hour=7, minute=30)
+    run_every = crontab(hour=7, minute=30)
 
 
 class WeeklyPeriodic(task.PeriodicTask):
-    run_every = task.crontab(hour=7, minute=30, day_of_week="thursday")
+    run_every = crontab(hour=7, minute=30, day_of_week="thursday")
 
 
 def patch_crontab_nowfun(cls, retval):

+ 34 - 0
celery/utils/timeutils.py

@@ -55,6 +55,40 @@ def delta_resolution(dt, delta):
     return dt
 
 
+def remaining(start, ends_in, now=None, relative=True):
+    """Calculate the remaining time for a start date and a timedelta.
+
+    E.g. "how many seconds left for 30 seconds after ``start``?"
+
+    :param start: Start :class:`datetime.datetime`.
+    :param ends_in: The end delta as a :class:`datetime.timedelta`.
+
+    :keyword relative: If set to ``False``, the end time will be calculated
+        using :func:`delta_resolution` (i.e. rounded to the resolution
+          of ``ends_in``).
+    :keyword now: The current time, defaults to :func:`datetime.now`.
+
+    >>> remaining(datetime.now(), ends_in=timedelta(seconds=30))
+    '0:0:29.999948'
+
+    >>> str(remaining(datetime.now() - timedelta(minutes=29),
+                  ends_in=timedelta(hours=2)))
+    '1:30:59.999938'
+
+    >>> str(remaining(datetime.now() - timedelta(minutes=29),
+                  ends_in=timedelta(hours=2),
+                  relative=False))
+    '1:11:18.458437'
+
+    """
+    now = now or datetime.now()
+
+    end_date = start + ends_in
+    if not relative:
+        end_date = delta_resolution(end_date, ends_in)
+    return end_date - now
+
+
 def rate(rate):
     """Parses rate strings, such as ``"100/m"`` or ``"2/h"``
     and converts them to seconds."""