timer2.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import atexit
  12. import heapq
  13. import os
  14. import sys
  15. from itertools import count
  16. from threading import Condition, Event, Lock, Thread
  17. from time import time, sleep, mktime
  18. from datetime import datetime, timedelta
  19. from kombu.log import get_logger
  20. VERSION = (1, 0, 0)
  21. __version__ = ".".join(map(str, VERSION))
  22. __author__ = "Ask Solem"
  23. __contact__ = "ask@celeryproject.org"
  24. __homepage__ = "http://github.com/ask/timer2/"
  25. __docformat__ = "restructuredtext"
  26. DEFAULT_MAX_INTERVAL = 2
  27. logger = get_logger("timer2")
  28. class Entry(object):
  29. cancelled = False
  30. def __init__(self, fun, args=None, kwargs=None):
  31. self.fun = fun
  32. self.args = args or []
  33. self.kwargs = kwargs or {}
  34. self.tref = self
  35. def __call__(self):
  36. return self.fun(*self.args, **self.kwargs)
  37. def cancel(self):
  38. self.tref.cancelled = True
  39. def __repr__(self):
  40. return "<TimerEntry: %s(*%r, **%r)" % (
  41. self.fun.__name__, self.args, self.kwargs)
  42. if sys.version_info >= (3, 0):
  43. def __hash__(self):
  44. return hash("|".join(map(repr, (self.fun, self.args,
  45. self.kwargs))))
  46. def __lt__(self, other):
  47. return hash(self) < hash(other)
  48. def __gt__(self, other):
  49. return hash(self) > hash(other)
  50. def __eq__(self, other):
  51. return hash(self) == hash(other)
  52. def to_timestamp(d):
  53. if isinstance(d, datetime):
  54. return mktime(d.timetuple())
  55. return d
  56. class Schedule(object):
  57. """ETA scheduler."""
  58. on_error = None
  59. def __init__(self, max_interval=DEFAULT_MAX_INTERVAL, on_error=None):
  60. self.max_interval = float(max_interval)
  61. self.on_error = on_error or self.on_error
  62. self._queue = []
  63. def handle_error(self, exc_info):
  64. if self.on_error:
  65. self.on_error(exc_info)
  66. return True
  67. def enter(self, entry, eta=None, priority=0):
  68. """Enter function into the scheduler.
  69. :param entry: Item to enter.
  70. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  71. :keyword priority: Unused.
  72. """
  73. if eta is None: # schedule now
  74. eta = datetime.now()
  75. try:
  76. eta = to_timestamp(eta)
  77. except OverflowError:
  78. if not self.handle_error(sys.exc_info()):
  79. raise
  80. if eta is None:
  81. # schedule now.
  82. eta = time()
  83. heapq.heappush(self._queue, (eta, priority, entry))
  84. return entry
  85. def __iter__(self):
  86. """The iterator yields the time to sleep for between runs."""
  87. # localize variable access
  88. nowfun = time
  89. pop = heapq.heappop
  90. max_interval = self.max_interval
  91. queue = self._queue
  92. while 1:
  93. if queue:
  94. eta, priority, entry = verify = queue[0]
  95. now = nowfun()
  96. if now < eta:
  97. yield min(eta - now, max_interval), None
  98. else:
  99. event = pop(queue)
  100. if event is verify:
  101. if not entry.cancelled:
  102. yield None, entry
  103. continue
  104. else:
  105. heapq.heappush(queue, event)
  106. yield None, None
  107. def empty(self):
  108. """Is the schedule empty?"""
  109. return not self._queue
  110. def clear(self):
  111. self._queue[:] = [] # used because we can't replace the object
  112. # and the operation is atomic.
  113. def info(self):
  114. return ({"eta": eta, "priority": priority, "item": item}
  115. for eta, priority, item in self.queue)
  116. @property
  117. def queue(self):
  118. events = list(self._queue)
  119. return map(heapq.heappop, [events] * len(events))
  120. class Timer(Thread):
  121. Entry = Entry
  122. Schedule = Schedule
  123. running = False
  124. on_tick = None
  125. _timer_count = count(1).next
  126. def __init__(self, schedule=None, on_error=None, on_tick=None, **kwargs):
  127. self.schedule = schedule or self.Schedule(on_error=on_error)
  128. self.on_tick = on_tick or self.on_tick
  129. Thread.__init__(self)
  130. self._is_shutdown = Event()
  131. self._is_stopped = Event()
  132. self.mutex = Lock()
  133. self.not_empty = Condition(self.mutex)
  134. self.setDaemon(True)
  135. self.setName("Timer-%s" % (self._timer_count(), ))
  136. def apply_entry(self, entry):
  137. try:
  138. entry()
  139. except Exception, exc:
  140. exc_info = sys.exc_info()
  141. try:
  142. if not self.schedule.handle_error(exc_info):
  143. logger.error("Error in timer: %r\n", exc, exc_info=True)
  144. finally:
  145. del(exc_info)
  146. def _next_entry(self):
  147. with self.not_empty:
  148. delay, entry = self.scheduler.next()
  149. if entry is None:
  150. if delay is None:
  151. self.not_empty.wait(1.0)
  152. return delay
  153. return self.apply_entry(entry)
  154. __next__ = next = _next_entry # for 2to3
  155. def run(self):
  156. try:
  157. self.running = True
  158. self.scheduler = iter(self.schedule)
  159. while not self._is_shutdown.isSet():
  160. delay = self._next_entry()
  161. if delay:
  162. if self.on_tick:
  163. self.on_tick(delay)
  164. if sleep is None: # pragma: no cover
  165. break
  166. sleep(delay)
  167. try:
  168. self._is_stopped.set()
  169. except TypeError: # pragma: no cover
  170. # we lost the race at interpreter shutdown,
  171. # so gc collected built-in modules.
  172. pass
  173. except Exception, exc:
  174. logger.error("Thread Timer crashed: %r", exc, exc_info=True)
  175. os._exit(1)
  176. def stop(self):
  177. if self.running:
  178. self._is_shutdown.set()
  179. self._is_stopped.wait()
  180. self.join(1e10)
  181. self.running = False
  182. def ensure_started(self):
  183. if not self.running and not self.isAlive():
  184. self.start()
  185. def enter(self, entry, eta, priority=None):
  186. self.ensure_started()
  187. with self.mutex:
  188. entry = self.schedule.enter(entry, eta, priority)
  189. self.not_empty.notify()
  190. return entry
  191. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  192. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  193. def enter_after(self, msecs, entry, priority=0):
  194. eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
  195. return self.enter(entry, eta, priority)
  196. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  197. return self.enter_after(msecs, Entry(fun, args, kwargs), priority)
  198. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  199. tref = Entry(fun, args, kwargs)
  200. def _reschedules(*args, **kwargs):
  201. try:
  202. return fun(*args, **kwargs)
  203. finally:
  204. if not tref.cancelled:
  205. self.enter_after(msecs, tref, priority)
  206. tref.fun = _reschedules
  207. return self.enter_after(msecs, tref, priority)
  208. def exit_after(self, msecs, priority=10):
  209. self.apply_after(msecs, sys.exit, priority)
  210. def cancel(self, tref):
  211. tref.cancel()
  212. def clear(self):
  213. self.schedule.clear()
  214. def empty(self):
  215. return self.schedule.empty()
  216. @property
  217. def queue(self):
  218. return self.schedule.queue
  219. default_timer = _default_timer = Timer()
  220. apply_after = _default_timer.apply_after
  221. apply_at = _default_timer.apply_at
  222. apply_interval = _default_timer.apply_interval
  223. enter_after = _default_timer.enter_after
  224. enter = _default_timer.enter
  225. exit_after = _default_timer.exit_after
  226. cancel = _default_timer.cancel
  227. clear = _default_timer.clear
  228. atexit.register(_default_timer.stop)