timer2.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # -*- coding: utf-8 -*-
  2. """Scheduler for Python functions.
  3. .. note::
  4. This is used for the thread-based worker only,
  5. not for amqp/redis/sqs/qpid where :mod:`kombu.async.timer` is used.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import os
  9. import sys
  10. import threading
  11. from itertools import count
  12. from time import sleep
  13. from celery.five import THREAD_TIMEOUT_MAX
  14. from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
  15. TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
  16. __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
  17. class Timer(threading.Thread):
  18. """Timer thread.
  19. Note:
  20. This is only used for transports not supporting AsyncIO.
  21. """
  22. Entry = Entry
  23. Schedule = Schedule
  24. running = False
  25. on_tick = None
  26. _timer_count = count(1)
  27. if TIMER_DEBUG: # pragma: no cover
  28. def start(self, *args, **kwargs):
  29. import traceback
  30. print('- Timer starting')
  31. traceback.print_stack()
  32. super(Timer, self).start(*args, **kwargs)
  33. def __init__(self, schedule=None, on_error=None, on_tick=None,
  34. on_start=None, max_interval=None, **kwargs):
  35. self.schedule = schedule or self.Schedule(on_error=on_error,
  36. max_interval=max_interval)
  37. self.on_start = on_start
  38. self.on_tick = on_tick or self.on_tick
  39. threading.Thread.__init__(self)
  40. self._is_shutdown = threading.Event()
  41. self._is_stopped = threading.Event()
  42. self.mutex = threading.Lock()
  43. self.not_empty = threading.Condition(self.mutex)
  44. self.daemon = True
  45. self.name = 'Timer-{0}'.format(next(self._timer_count))
  46. def _next_entry(self):
  47. with self.not_empty:
  48. delay, entry = next(self.scheduler)
  49. if entry is None:
  50. if delay is None:
  51. self.not_empty.wait(1.0)
  52. return delay
  53. return self.schedule.apply_entry(entry)
  54. __next__ = next = _next_entry # for 2to3
  55. def run(self):
  56. try:
  57. self.running = True
  58. self.scheduler = iter(self.schedule)
  59. while not self._is_shutdown.isSet():
  60. delay = self._next_entry()
  61. if delay:
  62. if self.on_tick:
  63. self.on_tick(delay)
  64. if sleep is None: # pragma: no cover
  65. break
  66. sleep(delay)
  67. try:
  68. self._is_stopped.set()
  69. except TypeError: # pragma: no cover
  70. # we lost the race at interpreter shutdown,
  71. # so gc collected built-in modules.
  72. pass
  73. except Exception as exc:
  74. logger.error('Thread Timer crashed: %r', exc, exc_info=True)
  75. os._exit(1)
  76. def stop(self):
  77. self._is_shutdown.set()
  78. if self.running:
  79. self._is_stopped.wait()
  80. self.join(THREAD_TIMEOUT_MAX)
  81. self.running = False
  82. def ensure_started(self):
  83. if not self.running and not self.isAlive():
  84. if self.on_start:
  85. self.on_start(self)
  86. self.start()
  87. def _do_enter(self, meth, *args, **kwargs):
  88. self.ensure_started()
  89. with self.mutex:
  90. entry = getattr(self.schedule, meth)(*args, **kwargs)
  91. self.not_empty.notify()
  92. return entry
  93. def enter(self, entry, eta, priority=None):
  94. return self._do_enter('enter_at', entry, eta, priority=priority)
  95. def call_at(self, *args, **kwargs):
  96. return self._do_enter('call_at', *args, **kwargs)
  97. def enter_after(self, *args, **kwargs):
  98. return self._do_enter('enter_after', *args, **kwargs)
  99. def call_after(self, *args, **kwargs):
  100. return self._do_enter('call_after', *args, **kwargs)
  101. def call_repeatedly(self, *args, **kwargs):
  102. return self._do_enter('call_repeatedly', *args, **kwargs)
  103. def exit_after(self, secs, priority=10):
  104. self.call_after(secs, sys.exit, priority)
  105. def cancel(self, tref):
  106. tref.cancel()
  107. def clear(self):
  108. self.schedule.clear()
  109. def empty(self):
  110. return not len(self)
  111. def __len__(self):
  112. return len(self.schedule)
  113. def __bool__(self):
  114. """``bool(timer)``."""
  115. return True
  116. __nonzero__ = __bool__
  117. @property
  118. def queue(self):
  119. return self.schedule.queue