timer2.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # -*- coding: utf-8 -*-
  2. """
  3. timer2
  4. ~~~~~~
  5. Scheduler for Python functions.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import sys
  10. import threading
  11. from itertools import count
  12. from time import sleep
  13. from celery.five import THREAD_TIMEOUT_MAX
  14. from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
  15. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  16. __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
  17. class Timer(threading.Thread):
  18. Entry = Entry
  19. Schedule = Schedule
  20. running = False
  21. on_tick = None
  22. _timer_count = count(1)
  23. if TIMER_DEBUG: # pragma: no cover
  24. def start(self, *args, **kwargs):
  25. import traceback
  26. print('- Timer starting')
  27. traceback.print_stack()
  28. super(Timer, self).start(*args, **kwargs)
  29. def __init__(self, schedule=None, on_error=None, on_tick=None,
  30. on_start=None, max_interval=None, **kwargs):
  31. self.schedule = schedule or self.Schedule(on_error=on_error,
  32. max_interval=max_interval)
  33. self.on_start = on_start
  34. self.on_tick = on_tick or self.on_tick
  35. threading.Thread.__init__(self)
  36. self._is_shutdown = threading.Event()
  37. self._is_stopped = threading.Event()
  38. self.mutex = threading.Lock()
  39. self.not_empty = threading.Condition(self.mutex)
  40. self.daemon = True
  41. self.name = 'Timer-{0}'.format(next(self._timer_count))
  42. def _next_entry(self):
  43. with self.not_empty:
  44. delay, entry = next(self.scheduler)
  45. if entry is None:
  46. if delay is None:
  47. self.not_empty.wait(1.0)
  48. return delay
  49. return self.schedule.apply_entry(entry)
  50. __next__ = next = _next_entry # for 2to3
  51. def run(self):
  52. try:
  53. self.running = True
  54. self.scheduler = iter(self.schedule)
  55. while not self._is_shutdown.isSet():
  56. delay = self._next_entry()
  57. if delay:
  58. if self.on_tick:
  59. self.on_tick(delay)
  60. if sleep is None: # pragma: no cover
  61. break
  62. sleep(delay)
  63. try:
  64. self._is_stopped.set()
  65. except TypeError: # pragma: no cover
  66. # we lost the race at interpreter shutdown,
  67. # so gc collected built-in modules.
  68. pass
  69. except Exception as exc:
  70. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  71. os._exit(1)
  72. def stop(self):
  73. if self.running:
  74. self._is_shutdown.set()
  75. self._is_stopped.wait()
  76. self.join(THREAD_TIMEOUT_MAX)
  77. self.running = False
  78. def ensure_started(self):
  79. if not self.running and not self.isAlive():
  80. if self.on_start:
  81. self.on_start(self)
  82. self.start()
  83. def _do_enter(self, meth, *args, **kwargs):
  84. self.ensure_started()
  85. with self.mutex:
  86. entry = getattr(self.schedule, meth)(*args, **kwargs)
  87. self.not_empty.notify()
  88. return entry
  89. def enter(self, entry, eta, priority=None):
  90. return self._do_enter('enter_at', entry, eta, priority=priority)
  91. def call_at(self, *args, **kwargs):
  92. return self._do_enter('call_at', *args, **kwargs)
  93. def enter_after(self, *args, **kwargs):
  94. return self._do_enter('enter_after', *args, **kwargs)
  95. def call_after(self, *args, **kwargs):
  96. return self._do_enter('call_after', *args, **kwargs)
  97. def call_repeatedly(self, *args, **kwargs):
  98. return self._do_enter('call_repeatedly', *args, **kwargs)
  99. def exit_after(self, secs, priority=10):
  100. self.call_after(secs, sys.exit, priority)
  101. def cancel(self, tref):
  102. tref.cancel()
  103. def clear(self):
  104. self.schedule.clear()
  105. def empty(self):
  106. return not len(self)
  107. def __len__(self):
  108. return len(self.schedule)
  109. def __nonzero__(self):
  110. return True
  111. @property
  112. def queue(self):
  113. return self.schedule.queue