timer2.py 6.1 KB


  1. """timer2 - Scheduler for Python functions."""
  2. from __future__ import generators
  3. import atexit
  4. import heapq
  5. import sys
  6. import traceback
  7. import warnings
  8. from threading import Thread, Event
  9. from time import time, sleep, mktime
  10. from datetime import datetime, timedelta
  11. VERSION = (0, 1, 0)
  12. __version__ = ".".join(map(str, VERSION))
  13. __author__ = "Ask Solem"
  14. __contact__ = "ask@celeryproject.org"
  15. __homepage__ = "http://github.com/ask/timer/"
  16. __docformat__ = "restructuredtext"
  17. DEFAULT_MAX_INTERVAL = 2
  18. class TimedFunctionFailed(UserWarning):
  19. pass
  20. class Entry(object):
  21. cancelled = False
  22. def __init__(self, fun, args, kwargs):
  23. self.fun = fun
  24. self.args = args or []
  25. self.kwargs = kwargs or {}
  26. self.tref = self
  27. def __call__(self):
  28. return self.fun(*self.args, **self.kwargs)
  29. def cancel(self):
  30. self.tref.cancelled = True
  31. class Schedule(object):
  32. """ETA scheduler."""
  33. on_error = None
  34. def __init__(self, max_interval=DEFAULT_MAX_INTERVAL, on_error=None):
  35. self.max_interval = float(max_interval)
  36. self.on_error = on_error or self.on_error
  37. self._queue = []
  38. def handle_error(self, exc_info):
  39. if self.on_error:
  40. self.on_error(exc_info)
  41. return True
  42. def enter(self, entry, eta=None, priority=0):
  43. """Enter function into the scheduler.
  44. :param entry: Item to enter.
  45. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  46. :keyword priority: Unused.
  47. """
  48. if isinstance(eta, datetime):
  49. try:
  50. eta = mktime(eta.timetuple())
  51. except OverflowError:
  52. self.handle_error(sys.exc_info())
  53. eta = eta or time()
  54. heapq.heappush(self._queue, (eta, priority, entry))
  55. return entry
  56. def __iter__(self):
  57. """The iterator yields the time to sleep for between runs."""
  58. # localize variable access
  59. nowfun = time
  60. pop = heapq.heappop
  61. while 1:
  62. if self._queue:
  63. eta, priority, entry = verify = self._queue[0]
  64. now = nowfun()
  65. if now < eta:
  66. yield min(eta - now, self.max_interval)
  67. else:
  68. event = pop(self._queue)
  69. if event is verify:
  70. if not entry.cancelled:
  71. try:
  72. entry()
  73. except Exception, exc:
  74. typ, val, tb = einfo = sys.exc_info()
  75. if not self.handle_error(einfo):
  76. warnings.warn(repr(exc),
  77. TimedFunctionFailed)
  78. traceback.print_exception(typ, val, tb)
  79. continue
  80. else:
  81. heapq.heappush(self._queue, event)
  82. yield None
  83. def empty(self):
  84. """Is the schedule empty?"""
  85. return not self._queue
  86. def clear(self):
  87. self._queue = []
  88. def info(self):
  89. return ({"eta": eta, "priority": priority, "item": item}
  90. for eta, priority, item in self.queue)
  91. @property
  92. def queue(self):
  93. events = list(self._queue)
  94. return map(heapq.heappop, [events]*len(events))
  95. class Timer(Thread):
  96. Entry = Entry
  97. precision = 0.3
  98. running = False
  99. on_tick = None
  100. def __init__(self, schedule=None, precision=None, on_error=None,
  101. on_tick=None):
  102. if precision is not None:
  103. self.precision = precision
  104. self.schedule = schedule or Schedule(on_error=on_error)
  105. self.on_tick = on_tick or self.on_tick
  106. Thread.__init__(self)
  107. self._shutdown = Event()
  108. self._stopped = Event()
  109. self.setDaemon(True)
  110. def run(self):
  111. self.running = True
  112. scheduler = iter(self.schedule)
  113. while not self._shutdown.isSet():
  114. delay = scheduler.next() or self.precision
  115. if self.on_tick:
  116. self.on_tick(delay)
  117. if sleep is None:
  118. break
  119. sleep(delay)
  120. try:
  121. self._stopped.set()
  122. except TypeError:
  123. # we lost the race at interpreter shutdown,
  124. # so gc collected built-in modules.
  125. pass
  126. def stop(self):
  127. if self.running:
  128. self._shutdown.set()
  129. self._stopped.wait()
  130. self.join(1e100)
  131. def enter(self, entry, eta, priority=None):
  132. if not self.running:
  133. self.start()
  134. return self.schedule.enter(entry, eta, priority)
  135. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  136. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  137. def enter_after(self, msecs, entry, priority=0):
  138. eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
  139. return self.enter(entry, eta, priority)
  140. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  141. return self.enter_after(msecs, Entry(fun, args, kwargs), priority)
  142. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  143. tref = Entry(fun, args, kwargs)
  144. def _reschedules(*args, **kwargs):
  145. try:
  146. return fun(*args, **kwargs)
  147. finally:
  148. self.enter_after(msecs, tref, priority)
  149. tref.fun = _reschedules
  150. return self.enter_after(msecs, tref, priority)
  151. def exit_after(self, msecs, priority=10):
  152. self.apply_after(msecs, sys.exit, priority)
  153. def cancel(self, tref):
  154. tref.cancel()
  155. def clear(self):
  156. self.schedule.clear()
  157. def empty(self):
  158. return self.schedule.empty()
  159. @property
  160. def queue(self):
  161. return self.schedule.queue
  162. _default_timer = Timer()
  163. apply_after = _default_timer.apply_after
  164. apply_at = _default_timer.apply_at
  165. apply_interval = _default_timer.apply_interval
  166. enter_after = _default_timer.enter_after
  167. enter = _default_timer.enter
  168. exit_after = _default_timer.exit_after
  169. cancel = _default_timer.cancel
  170. clear = _default_timer.clear
  171. atexit.register(_default_timer.stop)