timer2.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. import atexit
  9. import heapq
  10. import os
  11. import sys
  12. import threading
  13. from datetime import datetime
  14. from functools import wraps
  15. from itertools import count
  16. from time import time, sleep
  17. from weakref import proxy as weakrefproxy
  18. from celery.five import THREAD_TIMEOUT_MAX, map
  19. from celery.utils.timeutils import timedelta_seconds, timezone
  20. from kombu.log import get_logger
  21. VERSION = (1, 0, 0)
  22. __version__ = '.'.join(map(str, VERSION))
  23. __author__ = 'Ask Solem'
  24. __contact__ = 'ask@celeryproject.org'
  25. __homepage__ = 'http://github.com/ask/timer2/'
  26. __docformat__ = 'restructuredtext'
  27. DEFAULT_MAX_INTERVAL = 2
  28. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  29. EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
  30. IS_PYPY = hasattr(sys, 'pypy_version_info')
  31. logger = get_logger('timer2')
  32. class Entry(object):
  33. if not IS_PYPY:
  34. __slots__ = (
  35. 'fun', 'args', 'kwargs', 'tref', 'cancelled',
  36. '_last_run', '__weakref__',
  37. )
  38. def __init__(self, fun, args=None, kwargs=None):
  39. self.fun = fun
  40. self.args = args or []
  41. self.kwargs = kwargs or {}
  42. self.tref = weakrefproxy(self)
  43. self._last_run = None
  44. self.cancelled = False
  45. def __call__(self):
  46. return self.fun(*self.args, **self.kwargs)
  47. def cancel(self):
  48. try:
  49. self.tref.cancelled = True
  50. except ReferenceError:
  51. pass
  52. def __repr__(self):
  53. return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
  54. self.fun.__name__, self.args, self.kwargs)
  55. if sys.version_info[0] == 3: # pragma: no cover
  56. def __hash__(self):
  57. return hash('{0.fun!r}|{0.args!r}|{0.kwargs!r}'.format(self))
  58. def __lt__(self, other):
  59. return hash(self) < hash(other)
  60. def __gt__(self, other):
  61. return hash(self) > hash(other)
  62. def __eq__(self, other):
  63. return hash(self) == hash(other)
  64. def to_timestamp(d, default_timezone=timezone.utc):
  65. if isinstance(d, datetime):
  66. if d.tzinfo is None:
  67. d = d.replace(tzinfo=default_timezone)
  68. return timedelta_seconds(d - EPOCH)
  69. return d
  70. class Schedule(object):
  71. """ETA scheduler."""
  72. Entry = Entry
  73. on_error = None
  74. def __init__(self, max_interval=None, on_error=None, **kwargs):
  75. self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
  76. self.on_error = on_error or self.on_error
  77. self._queue = []
  78. def apply_entry(self, entry):
  79. try:
  80. entry()
  81. except Exception as exc:
  82. if not self.handle_error(exc):
  83. logger.error('Error in timer: %r', exc, exc_info=True)
  84. def handle_error(self, exc_info):
  85. if self.on_error:
  86. self.on_error(exc_info)
  87. return True
  88. def stop(self):
  89. pass
  90. def enter(self, entry, eta=None, priority=0):
  91. """Enter function into the scheduler.
  92. :param entry: Item to enter.
  93. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  94. :keyword priority: Unused.
  95. """
  96. if eta is None:
  97. eta = time()
  98. if isinstance(eta, datetime):
  99. try:
  100. eta = to_timestamp(eta)
  101. except Exception as exc:
  102. if not self.handle_error(exc):
  103. raise
  104. return
  105. return self._enter(eta, priority, entry)
  106. def _enter(self, eta, priority, entry):
  107. heapq.heappush(self._queue, (eta, priority, entry))
  108. return entry
  109. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  110. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  111. def enter_after(self, msecs, entry, priority=0, time=time):
  112. return self.enter(entry, time() + (msecs / 1000.0), priority)
  113. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  114. return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
  115. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  116. tref = self.Entry(fun, args, kwargs)
  117. secs = msecs * 1000.0
  118. @wraps(fun)
  119. def _reschedules(*args, **kwargs):
  120. last, now = tref._last_run, time()
  121. lsince = (now - tref._last_run) * 1000.0 if last else msecs
  122. try:
  123. if lsince and lsince >= msecs:
  124. tref._last_run = now
  125. return fun(*args, **kwargs)
  126. finally:
  127. if not tref.cancelled:
  128. last = tref._last_run
  129. next = secs - (now - last) if last else secs
  130. self.enter_after(next / 1000.0, tref, priority)
  131. tref.fun = _reschedules
  132. tref._last_run = None
  133. return self.enter_after(msecs, tref, priority)
  134. @property
  135. def schedule(self):
  136. return self
  137. def __iter__(self, min=min, nowfun=time, pop=heapq.heappop,
  138. push=heapq.heappush):
  139. """The iterator yields the time to sleep for between runs."""
  140. max_interval = self.max_interval
  141. queue = self._queue
  142. while 1:
  143. if queue:
  144. eta, priority, entry = verify = queue[0]
  145. now = nowfun()
  146. if now < eta:
  147. yield min(eta - now, max_interval), None
  148. else:
  149. event = pop(queue)
  150. if event is verify:
  151. if not entry.cancelled:
  152. yield None, entry
  153. continue
  154. else:
  155. push(queue, event)
  156. else:
  157. yield None, None
  158. def empty(self):
  159. """Is the schedule empty?"""
  160. return not self._queue
  161. def clear(self):
  162. self._queue[:] = [] # used because we can't replace the object
  163. # and the operation is atomic.
  164. def info(self):
  165. return ({'eta': eta, 'priority': priority, 'item': item}
  166. for eta, priority, item in self.queue)
  167. def cancel(self, tref):
  168. tref.cancel()
  169. @property
  170. def queue(self):
  171. events = list(self._queue)
  172. return [heapq.heappop(x) for x in [events] * len(events)]
  173. class Timer(threading.Thread):
  174. Entry = Entry
  175. Schedule = Schedule
  176. running = False
  177. on_tick = None
  178. _timer_count = count(1)
  179. if TIMER_DEBUG: # pragma: no cover
  180. def start(self, *args, **kwargs):
  181. import traceback
  182. print('- Timer starting')
  183. traceback.print_stack()
  184. super(Timer, self).start(*args, **kwargs)
  185. def __init__(self, schedule=None, on_error=None, on_tick=None,
  186. max_interval=None, **kwargs):
  187. self.schedule = schedule or self.Schedule(on_error=on_error,
  188. max_interval=max_interval)
  189. self.on_tick = on_tick or self.on_tick
  190. threading.Thread.__init__(self)
  191. self._is_shutdown = threading.Event()
  192. self._is_stopped = threading.Event()
  193. self.mutex = threading.Lock()
  194. self.not_empty = threading.Condition(self.mutex)
  195. self.daemon = True
  196. self.name = 'Timer-{0}'.format(next(self._timer_count))
  197. def _next_entry(self):
  198. with self.not_empty:
  199. delay, entry = next(self.scheduler)
  200. if entry is None:
  201. if delay is None:
  202. self.not_empty.wait(1.0)
  203. return delay
  204. return self.schedule.apply_entry(entry)
  205. __next__ = next = _next_entry # for 2to3
  206. def run(self):
  207. try:
  208. self.running = True
  209. self.scheduler = iter(self.schedule)
  210. while not self._is_shutdown.isSet():
  211. delay = self._next_entry()
  212. if delay:
  213. if self.on_tick:
  214. self.on_tick(delay)
  215. if sleep is None: # pragma: no cover
  216. break
  217. sleep(delay)
  218. try:
  219. self._is_stopped.set()
  220. except TypeError: # pragma: no cover
  221. # we lost the race at interpreter shutdown,
  222. # so gc collected built-in modules.
  223. pass
  224. except Exception as exc:
  225. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  226. os._exit(1)
  227. def stop(self):
  228. if self.running:
  229. self._is_shutdown.set()
  230. self._is_stopped.wait()
  231. self.join(THREAD_TIMEOUT_MAX)
  232. self.running = False
  233. def ensure_started(self):
  234. if not self.running and not self.isAlive():
  235. self.start()
  236. def _do_enter(self, meth, *args, **kwargs):
  237. self.ensure_started()
  238. with self.mutex:
  239. entry = getattr(self.schedule, meth)(*args, **kwargs)
  240. self.not_empty.notify()
  241. return entry
  242. def enter(self, entry, eta, priority=None):
  243. return self._do_enter('enter', entry, eta, priority=priority)
  244. def apply_at(self, *args, **kwargs):
  245. return self._do_enter('apply_at', *args, **kwargs)
  246. def enter_after(self, *args, **kwargs):
  247. return self._do_enter('enter_after', *args, **kwargs)
  248. def apply_after(self, *args, **kwargs):
  249. return self._do_enter('apply_after', *args, **kwargs)
  250. def apply_interval(self, *args, **kwargs):
  251. return self._do_enter('apply_interval', *args, **kwargs)
  252. def exit_after(self, msecs, priority=10):
  253. self.apply_after(msecs, sys.exit, priority)
  254. def cancel(self, tref):
  255. tref.cancel()
  256. def clear(self):
  257. self.schedule.clear()
  258. def empty(self):
  259. return self.schedule.empty()
  260. @property
  261. def queue(self):
  262. return self.schedule.queue
  263. default_timer = _default_timer = Timer()
  264. apply_after = _default_timer.apply_after
  265. apply_at = _default_timer.apply_at
  266. apply_interval = _default_timer.apply_interval
  267. enter_after = _default_timer.enter_after
  268. enter = _default_timer.enter
  269. exit_after = _default_timer.exit_after
  270. cancel = _default_timer.cancel
  271. clear = _default_timer.clear
  272. atexit.register(_default_timer.stop)