Pārlūkot izejas kodu

Moved date and time related utility functions to celery.utils.timeutils

Ask Solem 15 gadi atpakaļ
vecāks
revīzija
3ff2e37963

+ 5 - 43
celery/task/base.py

@@ -6,7 +6,9 @@ from billiard.serialization import pickle
 
 from celery import conf
 from celery.log import setup_task_logger
-from celery.utils import gen_unique_id, padlist, timedelta_seconds
+from celery.utils import gen_unique_id, padlist
+from celery.utils.timeutils import timedelta_seconds, weekday
+from celery.utils.timeutils import delta_resolution
 from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
 from celery.execute import apply_async, apply
 from celery.registry import tasks
@@ -663,7 +665,7 @@ class schedule(object):
         """Returns when the periodic task should run next as a timedelta."""
         next_run_at = last_run_at + self.run_every
         if not self.relative:
-            next_run_at = self.delta_resolution(next_run_at, self.run_every)
+            next_run_at = delta_resolution(next_run_at, self.run_every)
         return next_run_at - datetime.now()
 
     def is_due(self, last_run_at):
@@ -679,39 +681,6 @@ class schedule(object):
             return True, timedelta_seconds(self.run_every)
         return False, rem
 
-    def delta_resolution(self, dt, delta):
-        """Round a datetime to the resolution of a timedelta.
-
-        If the timedelta is in days, the datetime will be rounded
-        to the nearest days, if the timedelta is in hours the datetime
-        will be rounded to the nearest hour, and so on until seconds
-        which will just return the original datetime.
-
-            >>> now = datetime.now()
-            >>> now
-            datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
-            >>> delta_resolution(now, timedelta(days=2))
-            datetime.datetime(2010, 3, 30, 0, 0)
-            >>> delta_resolution(now, timedelta(hours=2))
-            datetime.datetime(2010, 3, 30, 11, 0)
-            >>> delta_resolution(now, timedelta(minutes=2))
-            datetime.datetime(2010, 3, 30, 11, 50)
-            >>> delta_resolution(now, timedelta(seconds=2))
-            datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
-
-        """
-        delta = timedelta_seconds(delta)
-
-        resolutions = ((3, lambda x: x / 86400),
-                       (4, lambda x: x / 3600),
-                       (5, lambda x: x / 60))
-
-        args = dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second
-        for res, predicate in resolutions:
-            if predicate(delta) >= 1.0:
-                return datetime(*args[:res])
-        return dt
-
 
 class crontab(schedule):
     """A crontab can be used as the ``run_every`` value of a
@@ -740,8 +709,6 @@ class crontab(schedule):
         represents the day of week that execution should occur.
 
     """
-    daynames = "sun", "mon", "tue", "wed", "thu", "fri", "sat"
-    weekdays = dict((name, dow) for name, dow in zip(daynames, range(7)))
 
     def __init__(self, minute=None, hour=None, day_of_week=None,
             nowfun=datetime.now):
@@ -751,12 +718,7 @@ class crontab(schedule):
         self.nowfun = nowfun
 
         if isinstance(self.day_of_week, basestring):
-            abbreviation = self.day_of_week[0:3].lower()
-            try:
-                self.day_of_week = self.weekdays[abbreviation]
-            except KeyError:
-                # Show original day name in exception, instead of abbr.
-                raise KeyError(self.day_of_week)
+            self.day_of_week = weekday(self.day_of_week)
 
 
     def remaining_estimate(self, last_run_at):

+ 6 - 5
celery/tests/test_buckets.py

@@ -9,6 +9,7 @@ from itertools import chain, izip
 from billiard.utils.functional import curry
 
 from celery.task.base import Task
+from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.worker import buckets
 from celery.registry import TaskRegistry
@@ -97,15 +98,15 @@ class TestRateLimitString(unittest.TestCase):
 
     @skip_if_disabled
     def test_conversion(self):
-        self.assertEqual(buckets.parse_ratelimit_string(999), 999)
-        self.assertEqual(buckets.parse_ratelimit_string("1456/s"), 1456)
-        self.assertEqual(buckets.parse_ratelimit_string("100/m"),
+        self.assertEqual(timeutils.rate(999), 999)
+        self.assertEqual(timeutils.rate("1456/s"), 1456)
+        self.assertEqual(timeutils.rate("100/m"),
                           100 / 60.0)
-        self.assertEqual(buckets.parse_ratelimit_string("10/h"),
+        self.assertEqual(timeutils.rate("10/h"),
                           10 / 60.0 / 60.0)
 
         for zero in (0, None, "0", "0/m", "0/h", "0/s"):
-            self.assertEqual(buckets.parse_ratelimit_string(zero), 0)
+            self.assertEqual(timeutils.rate(zero), 0)
 
 
 class TaskA(Task):

+ 2 - 1
celery/tests/test_task.py

@@ -6,6 +6,7 @@ from billiard.utils.functional import wraps
 
 from celery import task
 from celery import messaging
+from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.result import EagerResult
 from celery.execute import send_task
@@ -447,7 +448,7 @@ class TestPeriodicTask(unittest.TestCase):
             self.assertEqual(MyPeriodic().timedelta_seconds(delta), seconds)
 
     def test_delta_resolution(self):
-        D = MyPeriodic().run_every.delta_resolution
+        D = timeutils.delta_resolution
 
         dt = datetime(2010, 3, 30, 11, 50, 58, 41065)
         deltamap = ((timedelta(days=2), datetime(2010, 3, 30, 0, 0)),

+ 1 - 11
celery/utils/__init__.py

@@ -19,6 +19,7 @@ from carrot.utils import rpartition
 from billiard.utils.functional import curry
 
 from celery.utils.compat import all, any, defaultdict
+from celery.utils.timeutils import timedelta_seconds # was here before
 
 
 def noop(*args, **kwargs):
@@ -189,17 +190,6 @@ def fun_takes_kwargs(fun, kwlist=[]):
     return filter(curry(operator.contains, args), kwlist)
 
 
-def timedelta_seconds(delta):
-    """Convert :class:`datetime.timedelta` to seconds.
-
-    Doesn't account for negative values.
-
-    """
-    if delta.days < 0:
-        return 0
-    return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
-
-
 def get_cls_by_name(name, aliases={}):
     """Get class by name.
 

+ 85 - 0
celery/utils/timeutils.py

@@ -0,0 +1,85 @@
+from datetime import datetime
+
+from carrot.utils import partition
+
+DAYNAMES = "sun", "mon", "tue", "wed", "thu", "fri", "sat"
+WEEKDAYS = dict((name, dow) for name, dow in zip(DAYNAMES, range(7)))
+
+RATE_MODIFIER_MAP = {"s": lambda n: n,
+                     "m": lambda n: n / 60.0,
+                     "h": lambda n: n / 60.0 / 60.0}
+
+
+def timedelta_seconds(delta):
+    """Convert :class:`datetime.timedelta` to seconds.
+
+    Doesn't account for negative values.
+
+    """
+    if delta.days < 0:
+        return 0
+    return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
+
+
+def delta_resolution(dt, delta):
+    """Round a datetime to the resolution of a timedelta.
+
+    If the timedelta is in days, the datetime will be rounded
+    to the nearest days, if the timedelta is in hours the datetime
+    will be rounded to the nearest hour, and so on until seconds
+    which will just return the original datetime.
+
+        >>> now = datetime.now()
+        >>> now
+        datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
+        >>> delta_resolution(now, timedelta(days=2))
+        datetime.datetime(2010, 3, 30, 0, 0)
+        >>> delta_resolution(now, timedelta(hours=2))
+        datetime.datetime(2010, 3, 30, 11, 0)
+        >>> delta_resolution(now, timedelta(minutes=2))
+        datetime.datetime(2010, 3, 30, 11, 50)
+        >>> delta_resolution(now, timedelta(seconds=2))
+        datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
+
+    """
+    delta = timedelta_seconds(delta)
+
+    resolutions = ((3, lambda x: x / 86400),
+                   (4, lambda x: x / 3600),
+                   (5, lambda x: x / 60))
+
+    args = dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second
+    for res, predicate in resolutions:
+        if predicate(delta) >= 1.0:
+            return datetime(*args[:res])
+    return dt
+
+
+def rate(rate):
+    """Parses rate strings, such as ``"100/m"`` or ``"2/h"``
+    and converts them to seconds."""
+    if rate:
+        if isinstance(rate, basestring):
+            ops, _, modifier = partition(rate, "/")
+            return RATE_MODIFIER_MAP[modifier or "s"](int(ops)) or 0
+        return rate or 0
+    return 0
+
+
+def weekday(name):
+    """Return the position of a weekday (0 - 7, where 0 is Sunday).
+
+        >>> weekday("sunday")
+        0
+        >>> weekday("sun")
+        0
+        >>> weekday("mon")
+        1
+
+    """
+    abbreviation = name[0:3].lower()
+    try:
+        return WEEKDAYS[abbreviation]
+    except KeyError:
+        # Show original day name in exception, instead of abbr.
+        raise KeyError(name)

+ 2 - 24
celery/worker/buckets.py

@@ -2,36 +2,14 @@ import time
 from Queue import Queue, Empty as QueueEmpty
 from itertools import chain
 
-from carrot.utils import partition
-
 from celery.utils import all
+from celery.utils import timeutils
 from celery.utils.compat import izip_longest
 
-RATE_MODIFIER_MAP = {"s": lambda n: n,
-                     "m": lambda n: n / 60.0,
-                     "h": lambda n: n / 60.0 / 60.0}
-
-
 class RateLimitExceeded(Exception):
     """The token buckets rate limit has been exceeded."""
 
 
-def parse_ratelimit_string(rate_limit):
-    """Parse rate limit configurations such as ``"100/m"`` or ``"2/h"``
-        and convert them into seconds.
-
-    Returns ``0`` for no rate limit.
-
-    """
-
-    if rate_limit:
-        if isinstance(rate_limit, basestring):
-            ops, _, modifier = partition(rate_limit, "/")
-            return RATE_MODIFIER_MAP[modifier or "s"](int(ops)) or 0
-        return rate_limit or 0
-    return 0
-
-
 class TaskBucket(object):
     """This is a collection of token buckets, each task type having
     its own token bucket. If the task type doesn't have a rate limit,
@@ -155,7 +133,7 @@ class TaskBucket(object):
     def update_bucket_for_type(self, task_name):
         task_type = self.task_registry[task_name]
         rate_limit = getattr(task_type, "rate_limit", None)
-        rate_limit = parse_ratelimit_string(rate_limit)
+        rate_limit = timeutils.rate(rate_limit)
         if task_name in self.buckets:
             task_queue = self._get_queue_for_type(task_name)
         else: