timer2.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """timer2 - Scheduler for Python functions."""
  2. from __future__ import generators
  3. import atexit
  4. import heapq
  5. import sys
  6. import traceback
  7. import warnings
  8. from itertools import count
  9. from threading import Condition, Event, Lock, Thread
  10. from time import time, sleep, mktime
  11. from datetime import datetime, timedelta
  12. VERSION = (0, 1, 0)
  13. __version__ = ".".join(map(str, VERSION))
  14. __author__ = "Ask Solem"
  15. __contact__ = "ask@celeryproject.org"
  16. __homepage__ = "http://github.com/ask/timer/"
  17. __docformat__ = "restructuredtext"
  18. DEFAULT_MAX_INTERVAL = 2
  19. class TimedFunctionFailed(UserWarning):
  20. pass
  21. class Entry(object):
  22. cancelled = False
  23. def __init__(self, fun, args=None, kwargs=None):
  24. self.fun = fun
  25. self.args = args or []
  26. self.kwargs = kwargs or {}
  27. self.tref = self
  28. def __call__(self):
  29. return self.fun(*self.args, **self.kwargs)
  30. def cancel(self):
  31. self.tref.cancelled = True
  32. def to_timestamp(d):
  33. if isinstance(d, datetime):
  34. return mktime(d.timetuple())
  35. return d
  36. class Schedule(object):
  37. """ETA scheduler."""
  38. on_error = None
  39. def __init__(self, max_interval=DEFAULT_MAX_INTERVAL, on_error=None):
  40. self.max_interval = float(max_interval)
  41. self.on_error = on_error or self.on_error
  42. self._queue = []
  43. def handle_error(self, exc_info):
  44. if self.on_error:
  45. self.on_error(exc_info)
  46. return True
  47. def enter(self, entry, eta=None, priority=0):
  48. """Enter function into the scheduler.
  49. :param entry: Item to enter.
  50. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  51. :keyword priority: Unused.
  52. """
  53. try:
  54. eta = to_timestamp(eta)
  55. except OverflowError:
  56. if not self.handle_error(sys.exc_info()):
  57. raise
  58. if eta is None:
  59. # schedule now.
  60. eta = time()
  61. heapq.heappush(self._queue, (eta, priority, entry))
  62. return entry
  63. def __iter__(self):
  64. """The iterator yields the time to sleep for between runs."""
  65. # localize variable access
  66. nowfun = time
  67. pop = heapq.heappop
  68. q = self._queue
  69. max_interval = self.max_interval
  70. while 1:
  71. if q:
  72. eta, priority, entry = verify = q[0]
  73. now = nowfun()
  74. if now < eta:
  75. yield min(eta - now, max_interval), None
  76. else:
  77. event = pop(q)
  78. if event is verify:
  79. if not entry.cancelled:
  80. yield None, entry
  81. continue
  82. else:
  83. heapq.heappush(self._queue, event)
  84. yield None, None
  85. def empty(self):
  86. """Is the schedule empty?"""
  87. return not self._queue
  88. def clear(self):
  89. self._queue = []
  90. def info(self):
  91. return ({"eta": eta, "priority": priority, "item": item}
  92. for eta, priority, item in self.queue)
  93. @property
  94. def queue(self):
  95. events = list(self._queue)
  96. return map(heapq.heappop, [events] * len(events))
  97. class Timer(Thread):
  98. Entry = Entry
  99. Schedule = Schedule
  100. running = False
  101. on_tick = None
  102. _timer_count = count(1).next
  103. def __init__(self, schedule=None, on_error=None, on_tick=None, **kwargs):
  104. self.schedule = schedule or self.Schedule(on_error=on_error)
  105. self.on_tick = on_tick or self.on_tick
  106. Thread.__init__(self)
  107. self._shutdown = Event()
  108. self._stopped = Event()
  109. self.mutex = Lock()
  110. self.not_empty = Condition(self.mutex)
  111. self.setDaemon(True)
  112. self.setName("Timer-%s" % (self._timer_count(), ))
  113. def apply_entry(self, entry):
  114. try:
  115. entry()
  116. except Exception, exc:
  117. typ, val, tb = einfo = sys.exc_info()
  118. if not self.schedule.handle_error(einfo):
  119. warnings.warn(TimedFunctionFailed(repr(exc))),
  120. traceback.print_exception(typ, val, tb)
  121. def next(self):
  122. self.not_empty.acquire()
  123. try:
  124. delay, entry = self.scheduler.next()
  125. if entry is None:
  126. if delay is None:
  127. self.not_empty.wait(1.0)
  128. return delay
  129. finally:
  130. self.not_empty.release()
  131. return self.apply_entry(entry)
  132. def run(self):
  133. self.running = True
  134. self.scheduler = iter(self.schedule)
  135. while not self._shutdown.isSet():
  136. delay = self.next()
  137. if delay:
  138. if self.on_tick:
  139. self.on_tick(delay)
  140. if sleep is None:
  141. break
  142. sleep(delay)
  143. try:
  144. self._stopped.set()
  145. except TypeError: # pragma: no cover
  146. # we lost the race at interpreter shutdown,
  147. # so gc collected built-in modules.
  148. pass
  149. def stop(self):
  150. if self.running:
  151. self._shutdown.set()
  152. self._stopped.wait()
  153. self.join(1e100)
  154. self.running = False
  155. def ensure_started(self):
  156. if not self.running and not self.is_alive():
  157. self.start()
  158. def enter(self, entry, eta, priority=None):
  159. self.ensure_started()
  160. self.mutex.acquire()
  161. try:
  162. entry = self.schedule.enter(entry, eta, priority)
  163. self.not_empty.notify()
  164. return entry
  165. finally:
  166. self.mutex.release()
  167. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  168. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  169. def enter_after(self, msecs, entry, priority=0):
  170. eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
  171. return self.enter(entry, eta, priority)
  172. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  173. return self.enter_after(msecs, Entry(fun, args, kwargs), priority)
  174. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  175. tref = Entry(fun, args, kwargs)
  176. def _reschedules(*args, **kwargs):
  177. try:
  178. return fun(*args, **kwargs)
  179. finally:
  180. if not tref.cancelled:
  181. self.enter_after(msecs, tref, priority)
  182. tref.fun = _reschedules
  183. return self.enter_after(msecs, tref, priority)
  184. def exit_after(self, msecs, priority=10):
  185. self.apply_after(msecs, sys.exit, priority)
  186. def cancel(self, tref):
  187. tref.cancel()
  188. def clear(self):
  189. self.schedule.clear()
  190. def empty(self):
  191. return self.schedule.empty()
  192. @property
  193. def queue(self):
  194. return self.schedule.queue
  195. _default_timer = Timer()
  196. apply_after = _default_timer.apply_after
  197. apply_at = _default_timer.apply_at
  198. apply_interval = _default_timer.apply_interval
  199. enter_after = _default_timer.enter_after
  200. enter = _default_timer.enter
  201. exit_after = _default_timer.exit_after
  202. cancel = _default_timer.cancel
  203. clear = _default_timer.clear
  204. atexit.register(_default_timer.stop)