Browse Source

Moves timer to kombu.async.timer

Ask Solem 11 years ago
parent
commit
2a45066ddf

+ 3 - 3
celery/events/snapshot.py

@@ -16,7 +16,7 @@ from kombu.utils.limits import TokenBucket
 
 from celery import platforms
 from celery.app import app_or_default
-from celery.utils import timer2
+from celery.utils.timer2 import Timer
 from celery.utils.dispatch import Signal
 from celery.utils.imports import instantiate
 from celery.utils.log import get_logger
@@ -28,7 +28,7 @@ logger = get_logger('celery.evcam')
 
 
 class Polaroid(object):
-    timer = timer2
+    timer = None
     shutter_signal = Signal(providing_args=('state', ))
     cleanup_signal = Signal()
     clear_after = False
@@ -42,7 +42,7 @@ class Polaroid(object):
         self.state = state
         self.freq = freq
         self.cleanup_freq = cleanup_freq
-        self.timer = timer or self.timer
+        self.timer = timer or self.timer or Timer()
         self.logger = logger
         self.maxrate = maxrate and TokenBucket(rate(maxrate))
 

+ 10 - 19
celery/tests/utils/test_timer2.py

@@ -3,11 +3,9 @@ from __future__ import absolute_import
 import sys
 import time
 
-from mock import Mock, patch
-
 import celery.utils.timer2 as timer2
 
-from celery.tests.case import Case, skip_if_quick
+from celery.tests.case import Case, Mock, patch, skip_if_quick
 from kombu.tests.case import redirect_stdouts
 
 
@@ -51,27 +49,20 @@ class test_Schedule(Case):
         to_timestamp = timer2.to_timestamp
         scratch = [None]
 
-        def _overflow(x):
-            raise OverflowError(x)
-
         def on_error(exc_info):
             scratch[0] = exc_info
 
         s = timer2.Schedule(on_error=on_error)
 
-        timer2.to_timestamp = _overflow
-        try:
-            s.enter(timer2.Entry(lambda: None, (), {}),
-                    eta=datetime.now())
-            s.enter(timer2.Entry(lambda: None, (), {}),
-                    eta=None)
+        with patch('kombu.async.timer.to_timestamp') as tot:
+            tot.side_effect = OverflowError()
+            s.enter_at(timer2.Entry(lambda: None, (), {}),
+                       eta=datetime.now())
+            s.enter_at(timer2.Entry(lambda: None, (), {}), eta=None)
             s.on_error = None
             with self.assertRaises(OverflowError):
-                s.enter(timer2.Entry(lambda: None, (), {}),
-                        eta=datetime.now())
-        finally:
-            timer2.to_timestamp = to_timestamp
-
+                s.enter_at(timer2.Entry(lambda: None, (), {}),
+                           eta=datetime.now())
         exc = scratch[0]
         self.assertIsInstance(exc, OverflowError)
 
@@ -136,7 +127,7 @@ class test_Timer(Case):
         finally:
             t.stop()
 
-    @patch('celery.utils.timer2.logger')
+    @patch('kombu.async.timer.logger')
     def test_apply_entry_error_handled(self, logger):
         t = timer2.Timer()
         t.schedule.on_error = None
@@ -183,7 +174,7 @@ class test_Timer(Case):
         t._do_enter = Mock()
         e = Mock()
         t.enter(e, 13, 0)
-        t._do_enter.assert_called_with('enter', e, 13, priority=0)
+        t._do_enter.assert_called_with('enter_at', e, 13, priority=0)
 
     def test_test_enter_after(self):
         t = timer2.Timer()

+ 1 - 1
celery/tests/worker/test_autoreload.py

@@ -106,7 +106,7 @@ class test_StatMonitor(Case):
         x._mtime('a')
 
 
-class test_KQueueMontior(Case):
+class test_KQueueMonitor(Case):
 
     @patch('select.kqueue', create=True)
     @patch('os.close')

+ 2 - 2
celery/tests/worker/test_control.py

@@ -332,10 +332,10 @@ class test_ControlPanel(AppCase):
         panel = self.create_panel(consumer=consumer)
         self.assertFalse(panel.handle('dump_schedule'))
         r = TaskRequest(self.mytask.name, 'CAFEBABE', (), {}, app=self.app)
-        consumer.timer.schedule.enter(
+        consumer.timer.schedule.enter_at(
             consumer.timer.Entry(lambda x: x, (r, )),
             datetime.now() + timedelta(seconds=10))
-        consumer.timer.schedule.enter(
+        consumer.timer.schedule.enter_at(
             consumer.timer.Entry(lambda x: x, (object(), )),
             datetime.now() + timedelta(seconds=10))
         self.assertTrue(panel.handle('dump_schedule'))

+ 7 - 210
celery/utils/timer2.py

@@ -13,220 +13,17 @@ import os
 import sys
 import threading
 
-from collections import namedtuple
-from datetime import datetime
-from functools import wraps
 from itertools import count
-from time import time, sleep
-from weakref import proxy as weakrefproxy
+from time import sleep
 
 from celery.five import THREAD_TIMEOUT_MAX
 from celery.utils.timeutils import timedelta_seconds, timezone
-from kombu.log import get_logger
+from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
 
-VERSION = (1, 0, 0)
-__version__ = '.'.join(str(p) for p in VERSION)
-__author__ = 'Ask Solem'
-__contact__ = 'ask@celeryproject.org'
-__homepage__ = 'http://github.com/ask/timer2/'
-__docformat__ = 'restructuredtext'
-
-DEFAULT_MAX_INTERVAL = 2
 TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
-EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
-IS_PYPY = hasattr(sys, 'pypy_version_info')
-
-logger = get_logger('timer2')
 
 __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
 
-scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry'))
-
-
-class Entry(object):
-    if not IS_PYPY:  # pragma: no cover
-        __slots__ = (
-            'fun', 'args', 'kwargs', 'tref', 'cancelled',
-            '_last_run', '__weakref__',
-        )
-
-    def __init__(self, fun, args=None, kwargs=None):
-        self.fun = fun
-        self.args = args or []
-        self.kwargs = kwargs or {}
-        self.tref = weakrefproxy(self)
-        self._last_run = None
-        self.cancelled = False
-
-    def __call__(self):
-        return self.fun(*self.args, **self.kwargs)
-
-    def cancel(self):
-        try:
-            self.tref.cancelled = True
-        except ReferenceError:  # pragma: no cover
-            pass
-
-    def __repr__(self):
-        return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
-            self.fun.__name__, self.args, self.kwargs)
-
-    if sys.version_info[0] == 3:  # pragma: no cover
-
-        def __hash__(self):
-            return hash('{0.fun!r}|{0.args!r}|{0.kwargs!r}'.format(self))
-
-        def __lt__(self, other):
-            return hash(self) < hash(other)
-
-        def __gt__(self, other):
-            return hash(self) > hash(other)
-
-        def __eq__(self, other):
-            return hash(self) == hash(other)
-
-        def __ne__(self, other):
-            return not self.__eq__(other)
-
-
-def to_timestamp(d, default_timezone=timezone.utc):
-    if isinstance(d, datetime):
-        if d.tzinfo is None:
-            d = d.replace(tzinfo=default_timezone)
-        return timedelta_seconds(d - EPOCH)
-    return d
-
-
-class Schedule(object):
-    """ETA scheduler."""
-    Entry = Entry
-
-    on_error = None
-
-    def __init__(self, max_interval=None, on_error=None, **kwargs):
-        self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
-        self.on_error = on_error or self.on_error
-        self._queue = []
-
-    def apply_entry(self, entry):
-        try:
-            entry()
-        except Exception as exc:
-            if not self.handle_error(exc):
-                logger.error('Error in timer: %r', exc, exc_info=True)
-
-    def handle_error(self, exc_info):
-        if self.on_error:
-            self.on_error(exc_info)
-            return True
-
-    def stop(self):
-        pass
-
-    def enter(self, entry, eta=None, priority=0):
-        """Enter function into the scheduler.
-
-        :param entry: Item to enter.
-        :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
-        :keyword priority: Unused.
-
-        """
-        if eta is None:
-            eta = time()
-        if isinstance(eta, datetime):
-            try:
-                eta = to_timestamp(eta)
-            except Exception as exc:
-                if not self.handle_error(exc):
-                    raise
-                return
-        return self._enter(eta, priority, entry)
-
-    def _enter(self, eta, priority, entry):
-        heapq.heappush(self._queue, scheduled(eta, priority, entry))
-        return entry
-
-    def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
-        return self.enter(self.Entry(fun, args, kwargs), eta, priority)
-
-    def enter_after(self, msecs, entry, priority=0, time=time):
-        return self.enter(entry, time() + (msecs / 1000.0), priority)
-
-    def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
-        return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
-
-    def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
-        tref = self.Entry(fun, args, kwargs)
-        secs = msecs * 1000.0
-
-        @wraps(fun)
-        def _reschedules(*args, **kwargs):
-            last, now = tref._last_run, time()
-            lsince = (now - tref._last_run) * 1000.0 if last else msecs
-            try:
-                if lsince and lsince >= msecs:
-                    tref._last_run = now
-                    return fun(*args, **kwargs)
-            finally:
-                if not tref.cancelled:
-                    last = tref._last_run
-                    next = secs - (now - last) if last else secs
-                    self.enter_after(next / 1000.0, tref, priority)
-
-        tref.fun = _reschedules
-        tref._last_run = None
-        return self.enter_after(msecs, tref, priority)
-
-    @property
-    def schedule(self):
-        return self
-
-    def __iter__(self, min=min, nowfun=time, pop=heapq.heappop,
-                 push=heapq.heappush):
-        """The iterator yields the time to sleep for between runs."""
-        max_interval = self.max_interval
-        queue = self._queue
-
-        while 1:
-            if queue:
-                eventA = queue[0]
-                now, eta = nowfun(), eventA[0]
-
-                if now < eta:
-                    yield min(eta - now, max_interval), None
-                else:
-                    eventB = pop(queue)
-
-                    if eventB is eventA:
-                        entry = eventA[2]
-                        if not entry.cancelled:
-                            yield None, entry
-                        continue
-                    else:
-                        push(queue, eventB)
-            else:
-                yield None, None
-
-    def empty(self):
-        """Is the schedule empty?"""
-        return not self._queue
-
-    def clear(self):
-        self._queue[:] = []  # atomic, without creating a new list.
-
-    def info(self):
-        return ({'eta': eta, 'priority': priority, 'item': item}
-                for eta, priority, item in self.queue)
-
-    def cancel(self, tref):
-        tref.cancel()
-
-    @property
-    def queue(self, _pop=heapq.heappop):
-        """Snapshot of underlying datastructure."""
-        events = list(self._queue)
-        return [_pop(v) for v in [events] * len(events)]
-
 
 class Timer(threading.Thread):
     Entry = Entry
@@ -311,19 +108,19 @@ class Timer(threading.Thread):
             return entry
 
     def enter(self, entry, eta, priority=None):
-        return self._do_enter('enter', entry, eta, priority=priority)
+        return self._do_enter('enter_at', entry, eta, priority=priority)
 
     def apply_at(self, *args, **kwargs):
-        return self._do_enter('apply_at', *args, **kwargs)
+        return self._do_enter('call_at', *args, **kwargs)
 
     def enter_after(self, *args, **kwargs):
         return self._do_enter('enter_after', *args, **kwargs)
 
     def apply_after(self, *args, **kwargs):
-        return self._do_enter('apply_after', *args, **kwargs)
+        return self._do_enter('call_after', *args, **kwargs)
 
     def apply_interval(self, *args, **kwargs):
-        return self._do_enter('apply_interval', *args, **kwargs)
+        return self._do_enter('call_repeatedly', *args, **kwargs)
 
     def exit_after(self, msecs, priority=10):
         self.apply_after(msecs, sys.exit, priority)
@@ -335,7 +132,7 @@ class Timer(threading.Thread):
         self.schedule.clear()
 
     def empty(self):
-        return self.schedule.empty()
+        return not len(self.schedule)
 
     @property
     def queue(self):

+ 1 - 25
celery/utils/timeutils.py

@@ -15,6 +15,7 @@ from calendar import monthrange
 from datetime import date, datetime, timedelta, tzinfo
 
 from kombu.utils import cached_property, reprcall
+from kombu.utils.compat import timedelta_seconds
 
 from pytz import timezone as _timezone, AmbiguousTimeError
 
@@ -39,8 +40,6 @@ RATE_MODIFIER_MAP = {'s': lambda n: n,
                      'm': lambda n: n / 60.0,
                      'h': lambda n: n / 60.0 / 60.0}
 
-HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds')
-
 TIME_UNITS = (('day', 60 * 60 * 24.0, lambda n: format(n, '.2f')),
               ('hour', 60 * 60.0, lambda n: format(n, '.2f')),
               ('minute', 60.0, lambda n: format(n, '.2f')),
@@ -140,29 +139,6 @@ def maybe_timedelta(delta):
     return delta
 
 
-if HAVE_TIMEDELTA_TOTAL_SECONDS:   # pragma: no cover
-
-    def timedelta_seconds(delta):
-        """Convert :class:`datetime.timedelta` to seconds.
-
-        Doesn't account for negative values.
-
-        """
-        return max(delta.total_seconds(), 0)
-
-else:  # pragma: no cover
-
-    def timedelta_seconds(delta):  # noqa
-        """Convert :class:`datetime.timedelta` to seconds.
-
-        Doesn't account for negative values.
-
-        """
-        if delta.days < 0:
-            return 0
-        return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
-
-
 def delta_resolution(dt, delta):
     """Round a datetime to the resolution of a timedelta.
 

+ 2 - 2
celery/worker/components.py

@@ -12,12 +12,12 @@ import atexit
 
 from kombu.async import Hub as _Hub, get_event_loop, set_event_loop
 from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
+from kombu.async.timer import Timer as _Timer
 
 from celery import bootsteps
 from celery.exceptions import ImproperlyConfigured
 from celery.five import string_t
 from celery.utils.log import worker_logger as logger
-from celery.utils.timer2 import Schedule
 
 __all__ = ['Timer', 'Hub', 'Queues', 'Pool', 'Beat', 'StateDB', 'Consumer']
 
@@ -33,7 +33,7 @@ class Timer(bootsteps.Step):
     def create(self, w):
         if w.use_eventloop:
             # does not use dedicated timer thread.
-            w.timer = Schedule(max_interval=10.0)
+            w.timer = _Timer(max_interval=10.0)
         else:
             if not w.timer_cls:
                 # Default Timer is set by the pool, as e.g. eventlet

+ 1 - 1
celery/worker/strategy.py

@@ -10,10 +10,10 @@ from __future__ import absolute_import
 
 import logging
 
+from kombu.async.timer import to_timestamp
 from kombu.utils.encoding import safe_repr
 
 from celery.utils.log import get_logger
-from celery.utils.timer2 import to_timestamp
 from celery.utils.timeutils import timezone
 
 from .job import Request