timer2.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. import atexit
  9. import heapq
  10. import os
  11. import sys
  12. import threading
  13. from datetime import datetime
  14. from functools import wraps
  15. from itertools import count
  16. from time import time, sleep
  17. from celery.five import THREAD_TIMEOUT_MAX, map
  18. from celery.utils.timeutils import timedelta_seconds, timezone
  19. from kombu.log import get_logger
  20. VERSION = (1, 0, 0)
  21. __version__ = '.'.join(map(str, VERSION))
  22. __author__ = 'Ask Solem'
  23. __contact__ = 'ask@celeryproject.org'
  24. __homepage__ = 'http://github.com/ask/timer2/'
  25. __docformat__ = 'restructuredtext'
  26. DEFAULT_MAX_INTERVAL = 2
  27. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  28. EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
  29. logger = get_logger('timer2')
  30. class Entry(object):
  31. cancelled = False
  32. def __init__(self, fun, args=None, kwargs=None):
  33. self.fun = fun
  34. self.args = args or []
  35. self.kwargs = kwargs or {}
  36. self.tref = self
  37. def __call__(self):
  38. return self.fun(*self.args, **self.kwargs)
  39. def cancel(self):
  40. self.tref.cancelled = True
  41. def __repr__(self):
  42. return '<TimerEntry: {0}(*{1!r}, **{2!r})'.format(
  43. self.fun.__name__, self.args, self.kwargs)
  44. if sys.version_info[0] == 3: # pragma: no cover
  45. def __hash__(self):
  46. return hash('{0.fun!r}|{0.args!r}|{0.kwargs!r}'.format(self))
  47. def __lt__(self, other):
  48. return hash(self) < hash(other)
  49. def __gt__(self, other):
  50. return hash(self) > hash(other)
  51. def __eq__(self, other):
  52. return hash(self) == hash(other)
  53. def to_timestamp(d, default_timezone=timezone.utc):
  54. if isinstance(d, datetime):
  55. if d.tzinfo is None:
  56. d = d.replace(tzinfo=default_timezone)
  57. return timedelta_seconds(d - EPOCH)
  58. return d
  59. class Schedule(object):
  60. """ETA scheduler."""
  61. Entry = Entry
  62. on_error = None
  63. def __init__(self, max_interval=None, on_error=None, **kwargs):
  64. self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
  65. self.on_error = on_error or self.on_error
  66. self._queue = []
  67. def apply_entry(self, entry):
  68. try:
  69. entry()
  70. except Exception as exc:
  71. if not self.handle_error(exc):
  72. logger.error('Error in timer: %r', exc, exc_info=True)
  73. def handle_error(self, exc_info):
  74. if self.on_error:
  75. self.on_error(exc_info)
  76. return True
  77. def stop(self):
  78. pass
  79. def enter(self, entry, eta=None, priority=0):
  80. """Enter function into the scheduler.
  81. :param entry: Item to enter.
  82. :keyword eta: Scheduled time as a :class:`datetime.datetime` object.
  83. :keyword priority: Unused.
  84. """
  85. if eta is None:
  86. eta = time()
  87. if isinstance(eta, datetime):
  88. try:
  89. eta = to_timestamp(eta)
  90. except Exception as exc:
  91. if not self.handle_error(exc):
  92. raise
  93. return
  94. return self._enter(eta, priority, entry)
  95. def _enter(self, eta, priority, entry):
  96. heapq.heappush(self._queue, (eta, priority, entry))
  97. return entry
  98. def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
  99. return self.enter(self.Entry(fun, args, kwargs), eta, priority)
  100. def enter_after(self, msecs, entry, priority=0):
  101. return self.enter(entry, time() + (msecs / 1000.0), priority)
  102. def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
  103. return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
  104. def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
  105. tref = self.Entry(fun, args, kwargs)
  106. secs = msecs * 1000.0
  107. @wraps(fun)
  108. def _reschedules(*args, **kwargs):
  109. last, now = tref._last_run, time()
  110. lsince = (now - tref._last_run) * 1000.0 if last else msecs
  111. try:
  112. if lsince and lsince >= msecs:
  113. tref._last_run = now
  114. return fun(*args, **kwargs)
  115. finally:
  116. if not tref.cancelled:
  117. last = tref._last_run
  118. next = secs - (now - last) if last else secs
  119. self.enter_after(next / 1000.0, tref, priority)
  120. tref.fun = _reschedules
  121. tref._last_run = None
  122. return self.enter_after(msecs, tref, priority)
  123. @property
  124. def schedule(self):
  125. return self
  126. def __iter__(self):
  127. """The iterator yields the time to sleep for between runs."""
  128. # localize variable access
  129. nowfun = time
  130. pop = heapq.heappop
  131. max_interval = self.max_interval
  132. queue = self._queue
  133. while 1:
  134. if queue:
  135. eta, priority, entry = verify = queue[0]
  136. now = nowfun()
  137. if now < eta:
  138. yield min(eta - now, max_interval), None
  139. else:
  140. event = pop(queue)
  141. if event is verify:
  142. if not entry.cancelled:
  143. yield None, entry
  144. continue
  145. else:
  146. heapq.heappush(queue, event)
  147. else:
  148. yield None, None
  149. def empty(self):
  150. """Is the schedule empty?"""
  151. return not self._queue
  152. def clear(self):
  153. self._queue[:] = [] # used because we can't replace the object
  154. # and the operation is atomic.
  155. def info(self):
  156. return ({'eta': eta, 'priority': priority, 'item': item}
  157. for eta, priority, item in self.queue)
  158. def cancel(self, tref):
  159. tref.cancel()
  160. @property
  161. def queue(self):
  162. events = list(self._queue)
  163. return [heapq.heappop(x) for x in [events] * len(events)]
  164. class Timer(threading.Thread):
  165. Entry = Entry
  166. Schedule = Schedule
  167. running = False
  168. on_tick = None
  169. _timer_count = count(1)
  170. if TIMER_DEBUG: # pragma: no cover
  171. def start(self, *args, **kwargs):
  172. import traceback
  173. print('- Timer starting')
  174. traceback.print_stack()
  175. super(Timer, self).start(*args, **kwargs)
  176. def __init__(self, schedule=None, on_error=None, on_tick=None,
  177. max_interval=None, **kwargs):
  178. self.schedule = schedule or self.Schedule(on_error=on_error,
  179. max_interval=max_interval)
  180. self.on_tick = on_tick or self.on_tick
  181. threading.Thread.__init__(self)
  182. self._is_shutdown = threading.Event()
  183. self._is_stopped = threading.Event()
  184. self.mutex = threading.Lock()
  185. self.not_empty = threading.Condition(self.mutex)
  186. self.daemon = True
  187. self.name = 'Timer-{0}'.format(next(self._timer_count))
  188. def _next_entry(self):
  189. with self.not_empty:
  190. delay, entry = next(self.scheduler)
  191. if entry is None:
  192. if delay is None:
  193. self.not_empty.wait(1.0)
  194. return delay
  195. return self.schedule.apply_entry(entry)
  196. __next__ = next = _next_entry # for 2to3
  197. def run(self):
  198. try:
  199. self.running = True
  200. self.scheduler = iter(self.schedule)
  201. while not self._is_shutdown.isSet():
  202. delay = self._next_entry()
  203. if delay:
  204. if self.on_tick:
  205. self.on_tick(delay)
  206. if sleep is None: # pragma: no cover
  207. break
  208. sleep(delay)
  209. try:
  210. self._is_stopped.set()
  211. except TypeError: # pragma: no cover
  212. # we lost the race at interpreter shutdown,
  213. # so gc collected built-in modules.
  214. pass
  215. except Exception as exc:
  216. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  217. os._exit(1)
  218. def stop(self):
  219. if self.running:
  220. self._is_shutdown.set()
  221. self._is_stopped.wait()
  222. self.join(THREAD_TIMEOUT_MAX)
  223. self.running = False
  224. def ensure_started(self):
  225. if not self.running and not self.isAlive():
  226. self.start()
  227. def _do_enter(self, meth, *args, **kwargs):
  228. self.ensure_started()
  229. with self.mutex:
  230. entry = getattr(self.schedule, meth)(*args, **kwargs)
  231. self.not_empty.notify()
  232. return entry
  233. def enter(self, entry, eta, priority=None):
  234. return self._do_enter('enter', entry, eta, priority=priority)
  235. def apply_at(self, *args, **kwargs):
  236. return self._do_enter('apply_at', *args, **kwargs)
  237. def enter_after(self, *args, **kwargs):
  238. return self._do_enter('enter_after', *args, **kwargs)
  239. def apply_after(self, *args, **kwargs):
  240. return self._do_enter('apply_after', *args, **kwargs)
  241. def apply_interval(self, *args, **kwargs):
  242. return self._do_enter('apply_interval', *args, **kwargs)
  243. def exit_after(self, msecs, priority=10):
  244. self.apply_after(msecs, sys.exit, priority)
  245. def cancel(self, tref):
  246. tref.cancel()
  247. def clear(self):
  248. self.schedule.clear()
  249. def empty(self):
  250. return self.schedule.empty()
  251. @property
  252. def queue(self):
  253. return self.schedule.queue
  254. default_timer = _default_timer = Timer()
  255. apply_after = _default_timer.apply_after
  256. apply_at = _default_timer.apply_at
  257. apply_interval = _default_timer.apply_interval
  258. enter_after = _default_timer.enter_after
  259. enter = _default_timer.enter
  260. exit_after = _default_timer.exit_after
  261. cancel = _default_timer.cancel
  262. clear = _default_timer.clear
  263. atexit.register(_default_timer.stop)