timer2.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import atexit
  10. import heapq
  11. import os
  12. import sys
  13. from functools import wraps
  14. from itertools import count
  15. from threading import Condition, Event, Lock, Thread
  16. from time import time, sleep, mktime
  17. from datetime import datetime, timedelta
  18. from kombu.log import get_logger
  19. VERSION = (1, 0, 0)
  20. __version__ = '.'.join(map(str, VERSION))
  21. __author__ = 'Ask Solem'
  22. __contact__ = 'ask@celeryproject.org'
  23. __homepage__ = 'http://github.com/ask/timer2/'
  24. __docformat__ = 'restructuredtext'
  25. DEFAULT_MAX_INTERVAL = 2
  26. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  27. logger = get_logger('timer2')
  28. class Entry(object):
  29. cancelled = False
  30. def __init__(self, fun, args=None, kwargs=None):
  31. self.fun = fun
  32. self.args = args or []
  33. self.kwargs = kwargs or {}
  34. self.tref = self
  35. def __call__(self):
  36. return self.fun(*self.args, **self.kwargs)
  37. def cancel(self):
  38. self.tref.cancelled = True
  39. def __repr__(self):
  40. return '<TimerEntry: %s(*%r, **%r)' % (
  41. self.fun.__name__, self.args, self.kwargs)
  42. if sys.version_info[0] == 3: # pragma: no cover
  43. def __hash__(self):
  44. return hash('|'.join(map(repr, (self.fun, self.args,
  45. self.kwargs))))
  46. def __lt__(self, other):
  47. return hash(self) < hash(other)
  48. def __gt__(self, other):
  49. return hash(self) > hash(other)
  50. def __eq__(self, other):
  51. return hash(self) == hash(other)
  52. def to_timestamp(d):
  53. if isinstance(d, datetime):
  54. return mktime(d.timetuple())
  55. return d
  56. class Schedule(object):
  57. """ETA scheduler."""
  58. Entry = Entry
  59. on_error = None
  60. def __init__(self, max_interval=None, on_error=None, **kwargs):
  61. self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
  62. self.on_error = on_error or self.on_error
  63. self._queue = []
  64. def apply_entry(self, entry):
  65. try:
  66. entry()
  67. except Exception, exc:
  68. if not self.handle_error(exc):
  69. logger.error('Error in timer: %r', exc, exc_info=True)
  70. def handle_error(self, exc_info):
  71. if self.on_error:
  72. self.on_error(exc_info)
  73. return True
  74. def stop(self):
  75. pass
  76. def enter(self, entry, eta=None, priority=0):
  77. """Enter function into the scheduler.
  78. :param entry: Item to enter.
  79. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  80. :keyword priority: Unused.
  81. """
  82. if eta is None:
  83. eta = datetime.now()
  84. if isinstance(eta, datetime):
  85. try:
  86. eta = to_timestamp(eta)
  87. except OverflowError, exc:
  88. if not self.handle_error(exc):
  89. raise
  90. return
  91. return self._enter(eta, priority, entry)
  92. def _enter(self, eta, priority, entry):
  93. heapq.heappush(self._queue, (eta, priority, entry))
  94. return entry
  95. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  96. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  97. def enter_after(self, msecs, entry, priority=0):
  98. eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
  99. return self.enter(entry, eta, priority)
  100. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  101. return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
  102. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  103. tref = self.Entry(fun, args, kwargs)
  104. secs = msecs * 1000.0
  105. @wraps(fun)
  106. def _reschedules(*args, **kwargs):
  107. last, now = tref._last_run, time()
  108. lsince = (now - tref._last_run) * 1000.0 if last else msecs
  109. try:
  110. if lsince and lsince >= msecs:
  111. tref._last_run = now
  112. return fun(*args, **kwargs)
  113. finally:
  114. if not tref.cancelled:
  115. last = tref._last_run
  116. next = secs - (now - last) if last else secs
  117. self.enter_after(next / 1000.0, tref, priority)
  118. tref.fun = _reschedules
  119. tref._last_run = None
  120. return self.enter_after(msecs, tref, priority)
  121. def __iter__(self):
  122. """The iterator yields the time to sleep for between runs."""
  123. # localize variable access
  124. nowfun = time
  125. pop = heapq.heappop
  126. max_interval = self.max_interval
  127. queue = self._queue
  128. while 1:
  129. if queue:
  130. eta, priority, entry = verify = queue[0]
  131. now = nowfun()
  132. if now < eta:
  133. yield min(eta - now, max_interval), None
  134. else:
  135. event = pop(queue)
  136. if event is verify:
  137. if not entry.cancelled:
  138. yield None, entry
  139. continue
  140. else:
  141. heapq.heappush(queue, event)
  142. else:
  143. yield None, None
  144. def empty(self):
  145. """Is the schedule empty?"""
  146. return not self._queue
  147. def clear(self):
  148. self._queue[:] = [] # used because we can't replace the object
  149. # and the operation is atomic.
  150. def info(self):
  151. return ({'eta': eta, 'priority': priority, 'item': item}
  152. for eta, priority, item in self.queue)
  153. def cancel(self, tref):
  154. tref.cancel()
  155. @property
  156. def queue(self):
  157. events = list(self._queue)
  158. return map(heapq.heappop, [events] * len(events))
  159. class Timer(Thread):
  160. Entry = Entry
  161. Schedule = Schedule
  162. running = False
  163. on_tick = None
  164. _timer_count = count(1).next
  165. if TIMER_DEBUG: # pragma: no cover
  166. def start(self, *args, **kwargs):
  167. import traceback
  168. print('- Timer starting')
  169. traceback.print_stack()
  170. super(Timer, self).start(*args, **kwargs)
  171. def __init__(self, schedule=None, on_error=None, on_tick=None,
  172. max_interval=None, **kwargs):
  173. self.schedule = schedule or self.Schedule(on_error=on_error,
  174. max_interval=max_interval)
  175. self.on_tick = on_tick or self.on_tick
  176. Thread.__init__(self)
  177. self._is_shutdown = Event()
  178. self._is_stopped = Event()
  179. self.mutex = Lock()
  180. self.not_empty = Condition(self.mutex)
  181. self.setDaemon(True)
  182. self.setName('Timer-%s' % (self._timer_count(), ))
  183. def _next_entry(self):
  184. with self.not_empty:
  185. delay, entry = self.scheduler.next()
  186. if entry is None:
  187. if delay is None:
  188. self.not_empty.wait(1.0)
  189. return delay
  190. return self.schedule.apply_entry(entry)
  191. __next__ = next = _next_entry # for 2to3
  192. def run(self):
  193. try:
  194. self.running = True
  195. self.scheduler = iter(self.schedule)
  196. while not self._is_shutdown.isSet():
  197. delay = self._next_entry()
  198. if delay:
  199. if self.on_tick:
  200. self.on_tick(delay)
  201. if sleep is None: # pragma: no cover
  202. break
  203. sleep(delay)
  204. try:
  205. self._is_stopped.set()
  206. except TypeError: # pragma: no cover
  207. # we lost the race at interpreter shutdown,
  208. # so gc collected built-in modules.
  209. pass
  210. except Exception, exc:
  211. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  212. os._exit(1)
  213. def stop(self):
  214. if self.running:
  215. self._is_shutdown.set()
  216. self._is_stopped.wait()
  217. self.join(1e10)
  218. self.running = False
  219. def ensure_started(self):
  220. if not self.running and not self.isAlive():
  221. self.start()
  222. def _do_enter(self, meth, *args, **kwargs):
  223. self.ensure_started()
  224. with self.mutex:
  225. entry = getattr(self.schedule, meth)(*args, **kwargs)
  226. self.not_empty.notify()
  227. return entry
  228. def enter(self, entry, eta, priority=None):
  229. return self._do_enter('enter', entry, eta, priority=priority)
  230. def apply_at(self, *args, **kwargs):
  231. return self._do_enter('apply_at', *args, **kwargs)
  232. def enter_after(self, *args, **kwargs):
  233. return self._do_enter('enter_after', *args, **kwargs)
  234. def apply_after(self, *args, **kwargs):
  235. return self._do_enter('apply_after', *args, **kwargs)
  236. def apply_interval(self, *args, **kwargs):
  237. return self._do_enter('apply_interval', *args, **kwargs)
  238. def exit_after(self, msecs, priority=10):
  239. self.apply_after(msecs, sys.exit, priority)
  240. def cancel(self, tref):
  241. tref.cancel()
  242. def clear(self):
  243. self.schedule.clear()
  244. def empty(self):
  245. return self.schedule.empty()
  246. @property
  247. def queue(self):
  248. return self.schedule.queue
  249. default_timer = _default_timer = Timer()
  250. apply_after = _default_timer.apply_after
  251. apply_at = _default_timer.apply_at
  252. apply_interval = _default_timer.apply_interval
  253. enter_after = _default_timer.enter_after
  254. enter = _default_timer.enter
  255. exit_after = _default_timer.exit_after
  256. cancel = _default_timer.cancel
  257. clear = _default_timer.clear
  258. atexit.register(_default_timer.stop)