timer2.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import atexit
  10. import heapq
  11. import os
  12. import sys
  13. import threading
  14. from datetime import datetime
  15. from functools import wraps
  16. from itertools import count
  17. from time import time, sleep
  18. from celery.utils.compat import THREAD_TIMEOUT_MAX
  19. from celery.utils.timeutils import timedelta_seconds, timezone
  20. from kombu.log import get_logger
  21. VERSION = (1, 0, 0)
  22. __version__ = '.'.join(map(str, VERSION))
  23. __author__ = 'Ask Solem'
  24. __contact__ = 'ask@celeryproject.org'
  25. __homepage__ = 'http://github.com/ask/timer2/'
  26. __docformat__ = 'restructuredtext'
  27. DEFAULT_MAX_INTERVAL = 2
  28. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  29. EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
  30. logger = get_logger('timer2')
  31. class Entry(object):
  32. cancelled = False
  33. def __init__(self, fun, args=None, kwargs=None):
  34. self.fun = fun
  35. self.args = args or []
  36. self.kwargs = kwargs or {}
  37. self.tref = self
  38. def __call__(self):
  39. return self.fun(*self.args, **self.kwargs)
  40. def cancel(self):
  41. self.tref.cancelled = True
  42. def __repr__(self):
  43. return '<TimerEntry: %s(*%r, **%r)' % (
  44. self.fun.__name__, self.args, self.kwargs)
  45. if sys.version_info[0] == 3: # pragma: no cover
  46. def __hash__(self):
  47. return hash('|'.join(map(repr, (self.fun, self.args,
  48. self.kwargs))))
  49. def __lt__(self, other):
  50. return hash(self) < hash(other)
  51. def __gt__(self, other):
  52. return hash(self) > hash(other)
  53. def __eq__(self, other):
  54. return hash(self) == hash(other)
  55. def to_timestamp(d, default_timezone=timezone.utc):
  56. if isinstance(d, datetime):
  57. if d.tzinfo is None:
  58. d = d.replace(tzinfo=default_timezone)
  59. return timedelta_seconds(d - EPOCH)
  60. return d
  61. class Schedule(object):
  62. """ETA scheduler."""
  63. Entry = Entry
  64. on_error = None
  65. def __init__(self, max_interval=None, on_error=None, **kwargs):
  66. self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
  67. self.on_error = on_error or self.on_error
  68. self._queue = []
  69. def apply_entry(self, entry):
  70. try:
  71. entry()
  72. except Exception, exc:
  73. if not self.handle_error(exc):
  74. logger.error('Error in timer: %r', exc, exc_info=True)
  75. def handle_error(self, exc_info):
  76. if self.on_error:
  77. self.on_error(exc_info)
  78. return True
  79. def stop(self):
  80. pass
  81. def enter(self, entry, eta=None, priority=0):
  82. """Enter function into the scheduler.
  83. :param entry: Item to enter.
  84. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  85. :keyword priority: Unused.
  86. """
  87. if eta is None:
  88. eta = time()
  89. if isinstance(eta, datetime):
  90. try:
  91. eta = to_timestamp(eta)
  92. except Exception, exc:
  93. if not self.handle_error(exc):
  94. raise
  95. return
  96. return self._enter(eta, priority, entry)
  97. def _enter(self, eta, priority, entry):
  98. heapq.heappush(self._queue, (eta, priority, entry))
  99. return entry
  100. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  101. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  102. def enter_after(self, msecs, entry, priority=0):
  103. return self.enter(entry, time() + (msecs / 1000.0), priority)
  104. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  105. return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
  106. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  107. tref = self.Entry(fun, args, kwargs)
  108. secs = msecs * 1000.0
  109. @wraps(fun)
  110. def _reschedules(*args, **kwargs):
  111. last, now = tref._last_run, time()
  112. lsince = (now - tref._last_run) * 1000.0 if last else msecs
  113. try:
  114. if lsince and lsince >= msecs:
  115. tref._last_run = now
  116. return fun(*args, **kwargs)
  117. finally:
  118. if not tref.cancelled:
  119. last = tref._last_run
  120. next = secs - (now - last) if last else secs
  121. self.enter_after(next / 1000.0, tref, priority)
  122. tref.fun = _reschedules
  123. tref._last_run = None
  124. return self.enter_after(msecs, tref, priority)
  125. @property
  126. def schedule(self):
  127. return self
  128. def __iter__(self):
  129. """The iterator yields the time to sleep for between runs."""
  130. # localize variable access
  131. nowfun = time
  132. pop = heapq.heappop
  133. max_interval = self.max_interval
  134. queue = self._queue
  135. while 1:
  136. if queue:
  137. eta, priority, entry = verify = queue[0]
  138. now = nowfun()
  139. if now < eta:
  140. yield min(eta - now, max_interval), None
  141. else:
  142. event = pop(queue)
  143. if event is verify:
  144. if not entry.cancelled:
  145. yield None, entry
  146. continue
  147. else:
  148. heapq.heappush(queue, event)
  149. else:
  150. yield None, None
  151. def empty(self):
  152. """Is the schedule empty?"""
  153. return not self._queue
  154. def clear(self):
  155. self._queue[:] = [] # used because we can't replace the object
  156. # and the operation is atomic.
  157. def info(self):
  158. return ({'eta': eta, 'priority': priority, 'item': item}
  159. for eta, priority, item in self.queue)
  160. def cancel(self, tref):
  161. tref.cancel()
  162. @property
  163. def queue(self):
  164. events = list(self._queue)
  165. return map(heapq.heappop, [events] * len(events))
  166. class Timer(threading.Thread):
  167. Entry = Entry
  168. Schedule = Schedule
  169. running = False
  170. on_tick = None
  171. _timer_count = count(1).next
  172. if TIMER_DEBUG: # pragma: no cover
  173. def start(self, *args, **kwargs):
  174. import traceback
  175. print('- Timer starting')
  176. traceback.print_stack()
  177. super(Timer, self).start(*args, **kwargs)
  178. def __init__(self, schedule=None, on_error=None, on_tick=None,
  179. max_interval=None, **kwargs):
  180. self.schedule = schedule or self.Schedule(on_error=on_error,
  181. max_interval=max_interval)
  182. self.on_tick = on_tick or self.on_tick
  183. threading.Thread.__init__(self)
  184. self._is_shutdown = threading.Event()
  185. self._is_stopped = threading.Event()
  186. self.mutex = threading.Lock()
  187. self.not_empty = threading.Condition(self.mutex)
  188. self.setDaemon(True)
  189. self.setName('Timer-%s' % (self._timer_count(), ))
  190. def _next_entry(self):
  191. with self.not_empty:
  192. delay, entry = self.scheduler.next()
  193. if entry is None:
  194. if delay is None:
  195. self.not_empty.wait(1.0)
  196. return delay
  197. return self.schedule.apply_entry(entry)
  198. __next__ = next = _next_entry # for 2to3
  199. def run(self):
  200. try:
  201. self.running = True
  202. self.scheduler = iter(self.schedule)
  203. while not self._is_shutdown.isSet():
  204. delay = self._next_entry()
  205. if delay:
  206. if self.on_tick:
  207. self.on_tick(delay)
  208. if sleep is None: # pragma: no cover
  209. break
  210. sleep(delay)
  211. try:
  212. self._is_stopped.set()
  213. except TypeError: # pragma: no cover
  214. # we lost the race at interpreter shutdown,
  215. # so gc collected built-in modules.
  216. pass
  217. except Exception, exc:
  218. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  219. os._exit(1)
  220. def stop(self):
  221. if self.running:
  222. self._is_shutdown.set()
  223. self._is_stopped.wait()
  224. self.join(THREAD_TIMEOUT_MAX)
  225. self.running = False
  226. def ensure_started(self):
  227. if not self.running and not self.isAlive():
  228. self.start()
  229. def _do_enter(self, meth, *args, **kwargs):
  230. self.ensure_started()
  231. with self.mutex:
  232. entry = getattr(self.schedule, meth)(*args, **kwargs)
  233. self.not_empty.notify()
  234. return entry
  235. def enter(self, entry, eta, priority=None):
  236. return self._do_enter('enter', entry, eta, priority=priority)
  237. def apply_at(self, *args, **kwargs):
  238. return self._do_enter('apply_at', *args, **kwargs)
  239. def enter_after(self, *args, **kwargs):
  240. return self._do_enter('enter_after', *args, **kwargs)
  241. def apply_after(self, *args, **kwargs):
  242. return self._do_enter('apply_after', *args, **kwargs)
  243. def apply_interval(self, *args, **kwargs):
  244. return self._do_enter('apply_interval', *args, **kwargs)
  245. def exit_after(self, msecs, priority=10):
  246. self.apply_after(msecs, sys.exit, priority)
  247. def cancel(self, tref):
  248. tref.cancel()
  249. def clear(self):
  250. self.schedule.clear()
  251. def empty(self):
  252. return self.schedule.empty()
  253. @property
  254. def queue(self):
  255. return self.schedule.queue
  256. default_timer = _default_timer = Timer()
  257. apply_after = _default_timer.apply_after
  258. apply_at = _default_timer.apply_at
  259. apply_interval = _default_timer.apply_interval
  260. enter_after = _default_timer.enter_after
  261. enter = _default_timer.enter
  262. exit_after = _default_timer.exit_after
  263. cancel = _default_timer.cancel
  264. clear = _default_timer.clear
  265. atexit.register(_default_timer.stop)