123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- # -*- coding: utf-8 -*-
- """
- timer2
- ~~~~~~
- Scheduler for Python functions.
- """
- from __future__ import absolute_import
- import atexit
- import heapq
- import os
- import sys
- import threading
- from datetime import datetime
- from functools import wraps
- from itertools import count
- from time import time, sleep
- from weakref import proxy as weakrefproxy
- from celery.five import THREAD_TIMEOUT_MAX, map
- from celery.utils.timeutils import timedelta_seconds, timezone
- from kombu.log import get_logger
- VERSION = (1, 0, 0)
- __version__ = '.'.join(map(str, VERSION))
- __author__ = 'Ask Solem'
- __contact__ = 'ask@celeryproject.org'
- __homepage__ = 'http://github.com/ask/timer2/'
- __docformat__ = 'restructuredtext'
- DEFAULT_MAX_INTERVAL = 2
- TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
- EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
- IS_PYPY = hasattr(sys, 'pypy_version_info')
- logger = get_logger('timer2')
- class Entry(object):
- if not IS_PYPY:
- __slots__ = (
- 'fun', 'args', 'kwargs', 'tref', 'cancelled',
- '_last_run', '__weakref__',
- )
- def __init__(self, fun, args=None, kwargs=None):
- self.fun = fun
- self.args = args or []
- self.kwargs = kwargs or {}
- self.tref = weakrefproxy(self)
- self._last_run = None
- self.cancelled = False
- def __call__(self):
- return self.fun(*self.args, **self.kwargs)
- def cancel(self):
- try:
- self.tref.cancelled = True
- except ReferenceError:
- pass
- def __repr__(self):
- return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
- self.fun.__name__, self.args, self.kwargs)
- if sys.version_info[0] == 3: # pragma: no cover
- def __hash__(self):
- return hash('{0.fun!r}|{0.args!r}|{0.kwargs!r}'.format(self))
- def __lt__(self, other):
- return hash(self) < hash(other)
- def __gt__(self, other):
- return hash(self) > hash(other)
- def __eq__(self, other):
- return hash(self) == hash(other)
- def to_timestamp(d, default_timezone=timezone.utc):
- if isinstance(d, datetime):
- if d.tzinfo is None:
- d = d.replace(tzinfo=default_timezone)
- return timedelta_seconds(d - EPOCH)
- return d
- class Schedule(object):
- """ETA scheduler."""
- Entry = Entry
- on_error = None
- def __init__(self, max_interval=None, on_error=None, **kwargs):
- self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
- self.on_error = on_error or self.on_error
- self._queue = []
- def apply_entry(self, entry):
- try:
- entry()
- except Exception as exc:
- if not self.handle_error(exc):
- logger.error('Error in timer: %r', exc, exc_info=True)
- def handle_error(self, exc_info):
- if self.on_error:
- self.on_error(exc_info)
- return True
- def stop(self):
- pass
- def enter(self, entry, eta=None, priority=0):
- """Enter function into the scheduler.
- :param entry: Item to enter.
- :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
- :keyword priority: Unused.
- """
- if eta is None:
- eta = time()
- if isinstance(eta, datetime):
- try:
- eta = to_timestamp(eta)
- except Exception as exc:
- if not self.handle_error(exc):
- raise
- return
- return self._enter(eta, priority, entry)
- def _enter(self, eta, priority, entry):
- heapq.heappush(self._queue, (eta, priority, entry))
- return entry
- def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
- return self.enter(self.Entry(fun, args, kwargs), eta, priority)
- def enter_after(self, msecs, entry, priority=0, time=time):
- return self.enter(entry, time() + (msecs / 1000.0), priority)
- def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
- return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
- def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
- tref = self.Entry(fun, args, kwargs)
- secs = msecs * 1000.0
- @wraps(fun)
- def _reschedules(*args, **kwargs):
- last, now = tref._last_run, time()
- lsince = (now - tref._last_run) * 1000.0 if last else msecs
- try:
- if lsince and lsince >= msecs:
- tref._last_run = now
- return fun(*args, **kwargs)
- finally:
- if not tref.cancelled:
- last = tref._last_run
- next = secs - (now - last) if last else secs
- self.enter_after(next / 1000.0, tref, priority)
- tref.fun = _reschedules
- tref._last_run = None
- return self.enter_after(msecs, tref, priority)
- @property
- def schedule(self):
- return self
- def __iter__(self, min=min, nowfun=time, pop=heapq.heappop,
- push=heapq.heappush):
- """The iterator yields the time to sleep for between runs."""
- max_interval = self.max_interval
- queue = self._queue
- while 1:
- if queue:
- eta, priority, entry = verify = queue[0]
- now = nowfun()
- if now < eta:
- yield min(eta - now, max_interval), None
- else:
- event = pop(queue)
- if event is verify:
- if not entry.cancelled:
- yield None, entry
- continue
- else:
- push(queue, event)
- else:
- yield None, None
- def empty(self):
- """Is the schedule empty?"""
- return not self._queue
- def clear(self):
- self._queue[:] = [] # used because we can't replace the object
- # and the operation is atomic.
- def info(self):
- return ({'eta': eta, 'priority': priority, 'item': item}
- for eta, priority, item in self.queue)
- def cancel(self, tref):
- tref.cancel()
- @property
- def queue(self):
- events = list(self._queue)
- return [heapq.heappop(x) for x in [events] * len(events)]
- class Timer(threading.Thread):
- Entry = Entry
- Schedule = Schedule
- running = False
- on_tick = None
- _timer_count = count(1)
- if TIMER_DEBUG: # pragma: no cover
- def start(self, *args, **kwargs):
- import traceback
- print('- Timer starting')
- traceback.print_stack()
- super(Timer, self).start(*args, **kwargs)
- def __init__(self, schedule=None, on_error=None, on_tick=None,
- max_interval=None, **kwargs):
- self.schedule = schedule or self.Schedule(on_error=on_error,
- max_interval=max_interval)
- self.on_tick = on_tick or self.on_tick
- threading.Thread.__init__(self)
- self._is_shutdown = threading.Event()
- self._is_stopped = threading.Event()
- self.mutex = threading.Lock()
- self.not_empty = threading.Condition(self.mutex)
- self.daemon = True
- self.name = 'Timer-{0}'.format(next(self._timer_count))
- def _next_entry(self):
- with self.not_empty:
- delay, entry = next(self.scheduler)
- if entry is None:
- if delay is None:
- self.not_empty.wait(1.0)
- return delay
- return self.schedule.apply_entry(entry)
- __next__ = next = _next_entry # for 2to3
- def run(self):
- try:
- self.running = True
- self.scheduler = iter(self.schedule)
- while not self._is_shutdown.isSet():
- delay = self._next_entry()
- if delay:
- if self.on_tick:
- self.on_tick(delay)
- if sleep is None: # pragma: no cover
- break
- sleep(delay)
- try:
- self._is_stopped.set()
- except TypeError: # pragma: no cover
- # we lost the race at interpreter shutdown,
- # so gc collected built-in modules.
- pass
- except Exception as exc:
- logger.error('Thread Timer crashed: %r', exc, exc_info=True)
- os._exit(1)
- def stop(self):
- if self.running:
- self._is_shutdown.set()
- self._is_stopped.wait()
- self.join(THREAD_TIMEOUT_MAX)
- self.running = False
- def ensure_started(self):
- if not self.running and not self.isAlive():
- self.start()
- def _do_enter(self, meth, *args, **kwargs):
- self.ensure_started()
- with self.mutex:
- entry = getattr(self.schedule, meth)(*args, **kwargs)
- self.not_empty.notify()
- return entry
- def enter(self, entry, eta, priority=None):
- return self._do_enter('enter', entry, eta, priority=priority)
- def apply_at(self, *args, **kwargs):
- return self._do_enter('apply_at', *args, **kwargs)
- def enter_after(self, *args, **kwargs):
- return self._do_enter('enter_after', *args, **kwargs)
- def apply_after(self, *args, **kwargs):
- return self._do_enter('apply_after', *args, **kwargs)
- def apply_interval(self, *args, **kwargs):
- return self._do_enter('apply_interval', *args, **kwargs)
- def exit_after(self, msecs, priority=10):
- self.apply_after(msecs, sys.exit, priority)
- def cancel(self, tref):
- tref.cancel()
- def clear(self):
- self.schedule.clear()
- def empty(self):
- return self.schedule.empty()
- @property
- def queue(self):
- return self.schedule.queue
- default_timer = _default_timer = Timer()
- apply_after = _default_timer.apply_after
- apply_at = _default_timer.apply_at
- apply_interval = _default_timer.apply_interval
- enter_after = _default_timer.enter_after
- enter = _default_timer.enter
- exit_after = _default_timer.exit_after
- cancel = _default_timer.cancel
- clear = _default_timer.clear
- atexit.register(_default_timer.stop)
|