timer2.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # -*- coding: utf-8 -*-
  2. """Scheduler for Python functions.
  3. .. note::
  4. This is used for the thread-based worker only,
  5. not for amqp/redis/sqs/qpid where :mod:`kombu.async.timer` is used.
  6. """
  7. import os
  8. import sys
  9. import threading
  10. from itertools import count
  11. from time import sleep
  12. from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
  13. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  14. __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
  15. class Timer(threading.Thread):
  16. Entry = Entry
  17. Schedule = Schedule
  18. running = False
  19. on_tick = None
  20. _timer_count = count(1)
  21. if TIMER_DEBUG: # pragma: no cover
  22. def start(self, *args, **kwargs):
  23. import traceback
  24. print('- Timer starting')
  25. traceback.print_stack()
  26. super().start(*args, **kwargs)
  27. def __init__(self, schedule=None, on_error=None, on_tick=None,
  28. on_start=None, max_interval=None, **kwargs):
  29. self.schedule = schedule or self.Schedule(on_error=on_error,
  30. max_interval=max_interval)
  31. self.on_start = on_start
  32. self.on_tick = on_tick or self.on_tick
  33. threading.Thread.__init__(self)
  34. self._is_shutdown = threading.Event()
  35. self._is_stopped = threading.Event()
  36. self.mutex = threading.Lock()
  37. self.not_empty = threading.Condition(self.mutex)
  38. self.daemon = True
  39. self.name = 'Timer-{0}'.format(next(self._timer_count))
  40. def _next_entry(self):
  41. with self.not_empty:
  42. delay, entry = next(self.scheduler)
  43. if entry is None:
  44. if delay is None:
  45. self.not_empty.wait(1.0)
  46. return delay
  47. return self.schedule.apply_entry(entry)
  48. __next__ = next = _next_entry # for 2to3
  49. def run(self):
  50. try:
  51. self.running = True
  52. self.scheduler = iter(self.schedule)
  53. while not self._is_shutdown.isSet():
  54. delay = self._next_entry()
  55. if delay:
  56. if self.on_tick:
  57. self.on_tick(delay)
  58. if sleep is None: # pragma: no cover
  59. break
  60. sleep(delay)
  61. try:
  62. self._is_stopped.set()
  63. except TypeError: # pragma: no cover
  64. # we lost the race at interpreter shutdown,
  65. # so gc collected built-in modules.
  66. pass
  67. except Exception as exc:
  68. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  69. os._exit(1)
  70. def stop(self):
  71. self._is_shutdown.set()
  72. if self.running:
  73. self._is_stopped.wait()
  74. self.join(threading.TIMEOUT_MAX)
  75. self.running = False
  76. def ensure_started(self):
  77. if not self.running and not self.isAlive():
  78. if self.on_start:
  79. self.on_start(self)
  80. self.start()
  81. def _do_enter(self, meth, *args, **kwargs):
  82. self.ensure_started()
  83. with self.mutex:
  84. entry = getattr(self.schedule, meth)(*args, **kwargs)
  85. self.not_empty.notify()
  86. return entry
  87. def enter(self, entry, eta, priority=None):
  88. return self._do_enter('enter_at', entry, eta, priority=priority)
  89. def call_at(self, *args, **kwargs):
  90. return self._do_enter('call_at', *args, **kwargs)
  91. def enter_after(self, *args, **kwargs):
  92. return self._do_enter('enter_after', *args, **kwargs)
  93. def call_after(self, *args, **kwargs):
  94. return self._do_enter('call_after', *args, **kwargs)
  95. def call_repeatedly(self, *args, **kwargs):
  96. return self._do_enter('call_repeatedly', *args, **kwargs)
  97. def exit_after(self, secs, priority=10):
  98. self.call_after(secs, sys.exit, priority)
  99. def cancel(self, tref):
  100. tref.cancel()
  101. def clear(self):
  102. self.schedule.clear()
  103. def empty(self):
  104. return not len(self)
  105. def __len__(self):
  106. return len(self.schedule)
  107. def __bool__(self):
  108. return True
  109. __nonzero__ = __bool__
  110. @property
  111. def queue(self):
  112. return self.schedule.queue