timer2.py 6.0 KB


  1. """timer2 - Scheduler for Python functions."""
  2. from __future__ import generators
  3. import atexit
  4. import heapq
  5. import sys
  6. import traceback
  7. import warnings
  8. from threading import Thread, Event
  9. from time import time, sleep, mktime
  10. from datetime import datetime, timedelta
  11. VERSION = (0, 1, 0)
  12. __version__ = ".".join(map(str, VERSION))
  13. __author__ = "Ask Solem"
  14. __contact__ = "ask@celeryproject.org"
  15. __homepage__ = "http://github.com/ask/timer/"
  16. __docformat__ = "restructuredtext"
  17. DEFAULT_MAX_INTERVAL = 2
  18. class TimedFunctionFailed(UserWarning):
  19. pass
  20. class Entry(object):
  21. cancelled = False
  22. def __init__(self, fun, args, kwargs):
  23. self.fun = fun
  24. self.args = args or []
  25. self.kwargs = kwargs or {}
  26. self.tref = self
  27. def __call__(self):
  28. return self.fun(*self.args, **self.kwargs)
  29. def cancel(self):
  30. self.tref.cancelled = True
  31. class Schedule(object):
  32. """ETA scheduler."""
  33. on_error = None
  34. def __init__(self, max_interval=DEFAULT_MAX_INTERVAL, on_error=None):
  35. self.max_interval = float(max_interval)
  36. self.on_error = on_error or self.on_error
  37. self._queue = []
  38. def handle_error(self, exc_info):
  39. if self.on_error:
  40. self.on_error(exc_info)
  41. return True
  42. def enter(self, entry, eta=None, priority=0):
  43. """Enter function into the scheduler.
  44. :param entry: Item to enter.
  45. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  46. :keyword priority: Unused.
  47. """
  48. if isinstance(eta, datetime):
  49. try:
  50. eta = mktime(eta.timetuple())
  51. except OverflowError:
  52. self.handle_error(sys.exc_info())
  53. eta = eta or time()
  54. heapq.heappush(self._queue, (eta, priority, entry))
  55. return entry
  56. def __iter__(self):
  57. """The iterator yields the time to sleep for between runs."""
  58. # localize variable access
  59. nowfun = time
  60. pop = heapq.heappop
  61. while 1:
  62. if self._queue:
  63. eta, priority, entry = verify = self._queue[0]
  64. now = nowfun()
  65. if now < eta:
  66. yield min(eta - now, self.max_interval)
  67. else:
  68. event = pop(self._queue)
  69. if event is verify:
  70. if not entry.cancelled:
  71. try:
  72. entry()
  73. except Exception, exc:
  74. typ, val, tb = einfo = sys.exc_info()
  75. if not self.handle_error(einfo):
  76. warnings.warn(repr(exc),
  77. TimedFunctionFailed)
  78. traceback.print_exception(typ, val, tb)
  79. continue
  80. else:
  81. heapq.heappush(self._queue, event)
  82. yield None
  83. def empty(self):
  84. """Is the schedule empty?"""
  85. return not self._queue
  86. def clear(self):
  87. self._queue = []
  88. def info(self):
  89. return ({"eta": eta, "priority": priority, "item": item}
  90. for eta, priority, item in self.queue)
  91. @property
  92. def queue(self):
  93. events = list(self._queue)
  94. return map(heapq.heappop, [events]*len(events))
  95. class Timer(Thread):
  96. Entry = Entry
  97. precision = 0.3
  98. running = False
  99. on_tick = None
  100. def __init__(self, schedule=None, precision=None, on_error=None,
  101. on_tick=None):
  102. if precision is not None:
  103. self.precision = precision
  104. self.schedule = schedule or Schedule(on_error=on_error)
  105. self.on_tick = on_tick or self.on_tick
  106. Thread.__init__(self)
  107. self._shutdown = Event()
  108. self._stopped = Event()
  109. self.setDaemon(True)
  110. def run(self):
  111. self.running = True
  112. scheduler = iter(self.schedule)
  113. while not self._shutdown.isSet():
  114. delay = scheduler.next() or self.precision
  115. if self.on_tick:
  116. self.on_tick(delay)
  117. if sleep is None:
  118. break
  119. sleep(delay)
  120. self._stopped.set()
  121. def stop(self):
  122. if self.running:
  123. self._shutdown.set()
  124. self._stopped.wait()
  125. self.join(1e100)
  126. def enter(self, entry, eta, priority=None):
  127. if not self.running:
  128. self.start()
  129. return self.schedule.enter(entry, eta, priority)
  130. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  131. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  132. def enter_after(self, msecs, entry, priority=0):
  133. eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
  134. return self.enter(entry, eta, priority)
  135. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  136. return self.enter_after(msecs, Entry(fun, args, kwargs), priority)
  137. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  138. tref = Entry(fun, args, kwargs)
  139. def _reschedules(*args, **kwargs):
  140. try:
  141. return fun(*args, **kwargs)
  142. finally:
  143. self.enter_after(msecs, tref, priority)
  144. tref.fun = _reschedules
  145. return self.enter_after(msecs, tref, priority)
  146. def exit_after(self, msecs, priority=10):
  147. self.apply_after(msecs, sys.exit, priority)
  148. def cancel(self, tref):
  149. tref.cancel()
  150. def clear(self):
  151. self.schedule.clear()
  152. def empty(self):
  153. return self.schedule.empty()
  154. @property
  155. def queue(self):
  156. return self.schedule.queue
  157. _default_timer = Timer()
  158. apply_after = _default_timer.apply_after
  159. apply_at = _default_timer.apply_at
  160. apply_interval = _default_timer.apply_interval
  161. enter_after = _default_timer.enter_after
  162. enter = _default_timer.enter
  163. exit_after = _default_timer.exit_after
  164. cancel = _default_timer.cancel
  165. clear = _default_timer.clear
  166. atexit.register(_default_timer.stop)