timer2.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. import heapq
  9. import os
  10. import sys
  11. import threading
  12. from collections import namedtuple
  13. from datetime import datetime
  14. from functools import wraps
  15. from itertools import count
  16. from time import time, sleep
  17. from weakref import proxy as weakrefproxy
  18. from celery.five import THREAD_TIMEOUT_MAX
  19. from celery.utils.timeutils import timedelta_seconds, timezone
  20. from kombu.log import get_logger
  21. VERSION = (1, 0, 0)
  22. __version__ = '.'.join(str(p) for p in VERSION)
  23. __author__ = 'Ask Solem'
  24. __contact__ = 'ask@celeryproject.org'
  25. __homepage__ = 'http://github.com/ask/timer2/'
  26. __docformat__ = 'restructuredtext'
  27. DEFAULT_MAX_INTERVAL = 2
  28. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  29. EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
  30. IS_PYPY = hasattr(sys, 'pypy_version_info')
  31. logger = get_logger('timer2')
  32. __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
  33. scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry'))
  34. class Entry(object):
  35. if not IS_PYPY: # pragma: no cover
  36. __slots__ = (
  37. 'fun', 'args', 'kwargs', 'tref', 'cancelled',
  38. '_last_run', '__weakref__',
  39. )
  40. def __init__(self, fun, args=None, kwargs=None):
  41. self.fun = fun
  42. self.args = args or []
  43. self.kwargs = kwargs or {}
  44. self.tref = weakrefproxy(self)
  45. self._last_run = None
  46. self.cancelled = False
  47. def __call__(self):
  48. return self.fun(*self.args, **self.kwargs)
  49. def cancel(self):
  50. try:
  51. self.tref.cancelled = True
  52. except ReferenceError: # pragma: no cover
  53. pass
  54. def __repr__(self):
  55. return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
  56. self.fun.__name__, self.args, self.kwargs)
  57. if sys.version_info[0] == 3: # pragma: no cover
  58. def __hash__(self):
  59. return hash('{0.fun!r}|{0.args!r}|{0.kwargs!r}'.format(self))
  60. def __lt__(self, other):
  61. return hash(self) < hash(other)
  62. def __gt__(self, other):
  63. return hash(self) > hash(other)
  64. def __eq__(self, other):
  65. return hash(self) == hash(other)
  66. def __ne__(self, other):
  67. return not self.__eq__(other)
  68. def to_timestamp(d, default_timezone=timezone.utc):
  69. if isinstance(d, datetime):
  70. if d.tzinfo is None:
  71. d = d.replace(tzinfo=default_timezone)
  72. return timedelta_seconds(d - EPOCH)
  73. return d
  74. class Schedule(object):
  75. """ETA scheduler."""
  76. Entry = Entry
  77. on_error = None
  78. def __init__(self, max_interval=None, on_error=None, **kwargs):
  79. self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
  80. self.on_error = on_error or self.on_error
  81. self._queue = []
  82. def apply_entry(self, entry):
  83. try:
  84. entry()
  85. except Exception as exc:
  86. if not self.handle_error(exc):
  87. logger.error('Error in timer: %r', exc, exc_info=True)
  88. def handle_error(self, exc_info):
  89. if self.on_error:
  90. self.on_error(exc_info)
  91. return True
  92. def stop(self):
  93. pass
  94. def enter(self, entry, eta=None, priority=0):
  95. """Enter function into the scheduler.
  96. :param entry: Item to enter.
  97. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  98. :keyword priority: Unused.
  99. """
  100. if eta is None:
  101. eta = time()
  102. if isinstance(eta, datetime):
  103. try:
  104. eta = to_timestamp(eta)
  105. except Exception as exc:
  106. if not self.handle_error(exc):
  107. raise
  108. return
  109. return self._enter(eta, priority, entry)
  110. def _enter(self, eta, priority, entry):
  111. heapq.heappush(self._queue, scheduled(eta, priority, entry))
  112. return entry
  113. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  114. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  115. def enter_after(self, msecs, entry, priority=0, time=time):
  116. return self.enter(entry, time() + (msecs / 1000.0), priority)
  117. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  118. return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
  119. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  120. tref = self.Entry(fun, args, kwargs)
  121. secs = msecs * 1000.0
  122. @wraps(fun)
  123. def _reschedules(*args, **kwargs):
  124. last, now = tref._last_run, time()
  125. lsince = (now - tref._last_run) * 1000.0 if last else msecs
  126. try:
  127. if lsince and lsince >= msecs:
  128. tref._last_run = now
  129. return fun(*args, **kwargs)
  130. finally:
  131. if not tref.cancelled:
  132. last = tref._last_run
  133. next = secs - (now - last) if last else secs
  134. self.enter_after(next / 1000.0, tref, priority)
  135. tref.fun = _reschedules
  136. tref._last_run = None
  137. return self.enter_after(msecs, tref, priority)
  138. @property
  139. def schedule(self):
  140. return self
  141. def __iter__(self, min=min, nowfun=time, pop=heapq.heappop,
  142. push=heapq.heappush):
  143. """The iterator yields the time to sleep for between runs."""
  144. max_interval = self.max_interval
  145. queue = self._queue
  146. while 1:
  147. if queue:
  148. eventA = queue[0]
  149. now, eta = nowfun(), eventA[0]
  150. if now < eta:
  151. yield min(eta - now, max_interval), None
  152. else:
  153. eventB = pop(queue)
  154. if eventB is eventA:
  155. entry = eventA[2]
  156. if not entry.cancelled:
  157. yield None, entry
  158. continue
  159. else:
  160. push(queue, eventB)
  161. else:
  162. yield None, None
  163. def empty(self):
  164. """Is the schedule empty?"""
  165. return not self._queue
  166. def clear(self):
  167. self._queue[:] = [] # atomic, without creating a new list.
  168. def info(self):
  169. return ({'eta': eta, 'priority': priority, 'item': item}
  170. for eta, priority, item in self.queue)
  171. def cancel(self, tref):
  172. tref.cancel()
  173. @property
  174. def queue(self, _pop=heapq.heappop):
  175. """Snapshot of underlying datastructure."""
  176. events = list(self._queue)
  177. return [_pop(v) for v in [events] * len(events)]
  178. class Timer(threading.Thread):
  179. Entry = Entry
  180. Schedule = Schedule
  181. running = False
  182. on_tick = None
  183. _timer_count = count(1)
  184. if TIMER_DEBUG: # pragma: no cover
  185. def start(self, *args, **kwargs):
  186. import traceback
  187. print('- Timer starting')
  188. traceback.print_stack()
  189. super(Timer, self).start(*args, **kwargs)
  190. def __init__(self, schedule=None, on_error=None, on_tick=None,
  191. on_start=None, max_interval=None, **kwargs):
  192. self.schedule = schedule or self.Schedule(on_error=on_error,
  193. max_interval=max_interval)
  194. self.on_start = on_start
  195. self.on_tick = on_tick or self.on_tick
  196. threading.Thread.__init__(self)
  197. self._is_shutdown = threading.Event()
  198. self._is_stopped = threading.Event()
  199. self.mutex = threading.Lock()
  200. self.not_empty = threading.Condition(self.mutex)
  201. self.daemon = True
  202. self.name = 'Timer-{0}'.format(next(self._timer_count))
  203. def _next_entry(self):
  204. with self.not_empty:
  205. delay, entry = next(self.scheduler)
  206. if entry is None:
  207. if delay is None:
  208. self.not_empty.wait(1.0)
  209. return delay
  210. return self.schedule.apply_entry(entry)
  211. __next__ = next = _next_entry # for 2to3
  212. def run(self):
  213. try:
  214. self.running = True
  215. self.scheduler = iter(self.schedule)
  216. while not self._is_shutdown.isSet():
  217. delay = self._next_entry()
  218. if delay:
  219. if self.on_tick:
  220. self.on_tick(delay)
  221. if sleep is None: # pragma: no cover
  222. break
  223. sleep(delay)
  224. try:
  225. self._is_stopped.set()
  226. except TypeError: # pragma: no cover
  227. # we lost the race at interpreter shutdown,
  228. # so gc collected built-in modules.
  229. pass
  230. except Exception as exc:
  231. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  232. os._exit(1)
  233. def stop(self):
  234. if self.running:
  235. self._is_shutdown.set()
  236. self._is_stopped.wait()
  237. self.join(THREAD_TIMEOUT_MAX)
  238. self.running = False
  239. def ensure_started(self):
  240. if not self.running and not self.isAlive():
  241. if self.on_start:
  242. self.on_start(self)
  243. self.start()
  244. def _do_enter(self, meth, *args, **kwargs):
  245. self.ensure_started()
  246. with self.mutex:
  247. entry = getattr(self.schedule, meth)(*args, **kwargs)
  248. self.not_empty.notify()
  249. return entry
  250. def enter(self, entry, eta, priority=None):
  251. return self._do_enter('enter', entry, eta, priority=priority)
  252. def apply_at(self, *args, **kwargs):
  253. return self._do_enter('apply_at', *args, **kwargs)
  254. def enter_after(self, *args, **kwargs):
  255. return self._do_enter('enter_after', *args, **kwargs)
  256. def apply_after(self, *args, **kwargs):
  257. return self._do_enter('apply_after', *args, **kwargs)
  258. def apply_interval(self, *args, **kwargs):
  259. return self._do_enter('apply_interval', *args, **kwargs)
  260. def exit_after(self, msecs, priority=10):
  261. self.apply_after(msecs, sys.exit, priority)
  262. def cancel(self, tref):
  263. tref.cancel()
  264. def clear(self):
  265. self.schedule.clear()
  266. def empty(self):
  267. return self.schedule.empty()
  268. @property
  269. def queue(self):
  270. return self.schedule.queue