timer2.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import atexit
  10. import heapq
  11. import os
  12. import sys
  13. from functools import wraps
  14. from itertools import count
  15. from threading import Condition, Event, Lock, Thread
  16. from time import time, sleep, mktime
  17. from datetime import datetime, timedelta
  18. from kombu.log import get_logger
  19. VERSION = (1, 0, 0)
  20. __version__ = '.'.join(map(str, VERSION))
  21. __author__ = 'Ask Solem'
  22. __contact__ = 'ask@celeryproject.org'
  23. __homepage__ = 'http://github.com/ask/timer2/'
  24. __docformat__ = 'restructuredtext'
  25. DEFAULT_MAX_INTERVAL = 2
  26. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  27. logger = get_logger('timer2')
  28. class Entry(object):
  29. cancelled = False
  30. def __init__(self, fun, args=None, kwargs=None):
  31. self.fun = fun
  32. self.args = args or []
  33. self.kwargs = kwargs or {}
  34. self.tref = self
  35. def __call__(self):
  36. return self.fun(*self.args, **self.kwargs)
  37. def cancel(self):
  38. self.tref.cancelled = True
  39. def __repr__(self):
  40. return '<TimerEntry: %s(*%r, **%r)' % (
  41. self.fun.__name__, self.args, self.kwargs)
  42. if sys.version_info[0] == 3: # pragma: no cover
  43. def __hash__(self):
  44. return hash('|'.join(map(repr, (self.fun, self.args,
  45. self.kwargs))))
  46. def __lt__(self, other):
  47. return hash(self) < hash(other)
  48. def __gt__(self, other):
  49. return hash(self) > hash(other)
  50. def __eq__(self, other):
  51. return hash(self) == hash(other)
  52. def to_timestamp(d):
  53. if isinstance(d, datetime):
  54. return mktime(d.timetuple())
  55. return d
  56. class Schedule(object):
  57. """ETA scheduler."""
  58. Entry = Entry
  59. on_error = None
  60. def __init__(self, max_interval=None, on_error=None, **kwargs):
  61. self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
  62. self.on_error = on_error or self.on_error
  63. self._queue = []
  64. def apply_entry(self, entry):
  65. try:
  66. entry()
  67. except Exception, exc:
  68. if not self.handle_error(exc):
  69. logger.error('Error in timer: %r', exc, exc_info=True)
  70. def handle_error(self, exc_info):
  71. if self.on_error:
  72. self.on_error(exc_info)
  73. return True
  74. def stop(self):
  75. pass
  76. def enter(self, entry, eta=None, priority=0):
  77. """Enter function into the scheduler.
  78. :param entry: Item to enter.
  79. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  80. :keyword priority: Unused.
  81. """
  82. if eta is None:
  83. eta = datetime.now()
  84. if isinstance(eta, datetime):
  85. try:
  86. eta = to_timestamp(eta)
  87. except OverflowError, exc:
  88. if not self.handle_error(exc):
  89. raise
  90. return
  91. return self._enter(eta, priority, entry)
  92. def _enter(self, eta, priority, entry):
  93. heapq.heappush(self._queue, (eta, priority, entry))
  94. return entry
  95. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  96. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  97. def enter_after(self, msecs, entry, priority=0):
  98. eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
  99. return self.enter(entry, eta, priority)
  100. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  101. return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
  102. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  103. tref = self.Entry(fun, args, kwargs)
  104. secs = msecs * 1000.0
  105. @wraps(fun)
  106. def _reschedules(*args, **kwargs):
  107. last, now = tref._last_run, time()
  108. lsince = (now - tref._last_run) * 1000.0 if last else msecs
  109. try:
  110. if lsince and lsince >= msecs:
  111. tref._last_run = now
  112. return fun(*args, **kwargs)
  113. finally:
  114. if not tref.cancelled:
  115. last = tref._last_run
  116. next = secs - (now - last) if last else secs
  117. self.enter_after(next / 1000.0, tref, priority)
  118. tref.fun = _reschedules
  119. tref._last_run = None
  120. return self.enter_after(msecs, tref, priority)
  121. @property
  122. def schedule(self):
  123. return self
  124. def __iter__(self):
  125. """The iterator yields the time to sleep for between runs."""
  126. # localize variable access
  127. nowfun = time
  128. pop = heapq.heappop
  129. max_interval = self.max_interval
  130. queue = self._queue
  131. while 1:
  132. if queue:
  133. eta, priority, entry = verify = queue[0]
  134. now = nowfun()
  135. if now < eta:
  136. yield min(eta - now, max_interval), None
  137. else:
  138. event = pop(queue)
  139. if event is verify:
  140. if not entry.cancelled:
  141. yield None, entry
  142. continue
  143. else:
  144. heapq.heappush(queue, event)
  145. else:
  146. yield None, None
  147. def empty(self):
  148. """Is the schedule empty?"""
  149. return not self._queue
  150. def clear(self):
  151. self._queue[:] = [] # used because we can't replace the object
  152. # and the operation is atomic.
  153. def info(self):
  154. return ({'eta': eta, 'priority': priority, 'item': item}
  155. for eta, priority, item in self.queue)
  156. def cancel(self, tref):
  157. tref.cancel()
  158. @property
  159. def queue(self):
  160. events = list(self._queue)
  161. return map(heapq.heappop, [events] * len(events))
  162. class Timer(Thread):
  163. Entry = Entry
  164. Schedule = Schedule
  165. running = False
  166. on_tick = None
  167. _timer_count = count(1).next
  168. if TIMER_DEBUG: # pragma: no cover
  169. def start(self, *args, **kwargs):
  170. import traceback
  171. print('- Timer starting')
  172. traceback.print_stack()
  173. super(Timer, self).start(*args, **kwargs)
  174. def __init__(self, schedule=None, on_error=None, on_tick=None,
  175. max_interval=None, **kwargs):
  176. self.schedule = schedule or self.Schedule(on_error=on_error,
  177. max_interval=max_interval)
  178. self.on_tick = on_tick or self.on_tick
  179. Thread.__init__(self)
  180. self._is_shutdown = Event()
  181. self._is_stopped = Event()
  182. self.mutex = Lock()
  183. self.not_empty = Condition(self.mutex)
  184. self.setDaemon(True)
  185. self.setName('Timer-%s' % (self._timer_count(), ))
  186. def _next_entry(self):
  187. with self.not_empty:
  188. delay, entry = self.scheduler.next()
  189. if entry is None:
  190. if delay is None:
  191. self.not_empty.wait(1.0)
  192. return delay
  193. return self.schedule.apply_entry(entry)
  194. __next__ = next = _next_entry # for 2to3
  195. def run(self):
  196. try:
  197. self.running = True
  198. self.scheduler = iter(self.schedule)
  199. while not self._is_shutdown.isSet():
  200. delay = self._next_entry()
  201. if delay:
  202. if self.on_tick:
  203. self.on_tick(delay)
  204. if sleep is None: # pragma: no cover
  205. break
  206. sleep(delay)
  207. try:
  208. self._is_stopped.set()
  209. except TypeError: # pragma: no cover
  210. # we lost the race at interpreter shutdown,
  211. # so gc collected built-in modules.
  212. pass
  213. except Exception, exc:
  214. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  215. os._exit(1)
  216. def stop(self):
  217. if self.running:
  218. self._is_shutdown.set()
  219. self._is_stopped.wait()
  220. self.join(1e10)
  221. self.running = False
  222. def ensure_started(self):
  223. if not self.running and not self.isAlive():
  224. self.start()
  225. def _do_enter(self, meth, *args, **kwargs):
  226. self.ensure_started()
  227. with self.mutex:
  228. entry = getattr(self.schedule, meth)(*args, **kwargs)
  229. self.not_empty.notify()
  230. return entry
  231. def enter(self, entry, eta, priority=None):
  232. return self._do_enter('enter', entry, eta, priority=priority)
  233. def apply_at(self, *args, **kwargs):
  234. return self._do_enter('apply_at', *args, **kwargs)
  235. def enter_after(self, *args, **kwargs):
  236. return self._do_enter('enter_after', *args, **kwargs)
  237. def apply_after(self, *args, **kwargs):
  238. return self._do_enter('apply_after', *args, **kwargs)
  239. def apply_interval(self, *args, **kwargs):
  240. return self._do_enter('apply_interval', *args, **kwargs)
  241. def exit_after(self, msecs, priority=10):
  242. self.apply_after(msecs, sys.exit, priority)
  243. def cancel(self, tref):
  244. tref.cancel()
  245. def clear(self):
  246. self.schedule.clear()
  247. def empty(self):
  248. return self.schedule.empty()
  249. @property
  250. def queue(self):
  251. return self.schedule.queue
  252. default_timer = _default_timer = Timer()
  253. apply_after = _default_timer.apply_after
  254. apply_at = _default_timer.apply_at
  255. apply_interval = _default_timer.apply_interval
  256. enter_after = _default_timer.enter_after
  257. enter = _default_timer.enter
  258. exit_after = _default_timer.exit_after
  259. cancel = _default_timer.cancel
  260. clear = _default_timer.clear
  261. atexit.register(_default_timer.stop)