123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- # -*- coding: utf-8 -*-
- """
- timer2
- ~~~~~~
- Scheduler for Python functions.
- """
- from __future__ import absolute_import
- import os
- import sys
- import threading
- from itertools import count
- from time import sleep
- from celery.five import THREAD_TIMEOUT_MAX
- from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
- TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
- __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
- class Timer(threading.Thread):
- Entry = Entry
- Schedule = Schedule
- running = False
- on_tick = None
- _timer_count = count(1)
- if TIMER_DEBUG: # pragma: no cover
- def start(self, *args, **kwargs):
- import traceback
- print('- Timer starting')
- traceback.print_stack()
- super(Timer, self).start(*args, **kwargs)
- def __init__(self, schedule=None, on_error=None, on_tick=None,
- on_start=None, max_interval=None, **kwargs):
- self.schedule = schedule or self.Schedule(on_error=on_error,
- max_interval=max_interval)
- self.on_start = on_start
- self.on_tick = on_tick or self.on_tick
- threading.Thread.__init__(self)
- self._is_shutdown = threading.Event()
- self._is_stopped = threading.Event()
- self.mutex = threading.Lock()
- self.not_empty = threading.Condition(self.mutex)
- self.daemon = True
- self.name = 'Timer-{0}'.format(next(self._timer_count))
- def _next_entry(self):
- with self.not_empty:
- delay, entry = next(self.scheduler)
- if entry is None:
- if delay is None:
- self.not_empty.wait(1.0)
- return delay
- return self.schedule.apply_entry(entry)
- __next__ = next = _next_entry # for 2to3
- def run(self):
- try:
- self.running = True
- self.scheduler = iter(self.schedule)
- while not self._is_shutdown.isSet():
- delay = self._next_entry()
- if delay:
- if self.on_tick:
- self.on_tick(delay)
- if sleep is None: # pragma: no cover
- break
- sleep(delay)
- try:
- self._is_stopped.set()
- except TypeError: # pragma: no cover
- # we lost the race at interpreter shutdown,
- # so gc collected built-in modules.
- pass
- except Exception as exc:
- logger.error('Thread Timer crashed: %r', exc, exc_info=True)
- os._exit(1)
- def stop(self):
- if self.running:
- self._is_shutdown.set()
- self._is_stopped.wait()
- self.join(THREAD_TIMEOUT_MAX)
- self.running = False
- def ensure_started(self):
- if not self.running and not self.isAlive():
- if self.on_start:
- self.on_start(self)
- self.start()
- def _do_enter(self, meth, *args, **kwargs):
- self.ensure_started()
- with self.mutex:
- entry = getattr(self.schedule, meth)(*args, **kwargs)
- self.not_empty.notify()
- return entry
- def enter(self, entry, eta, priority=None):
- return self._do_enter('enter_at', entry, eta, priority=priority)
- def call_at(self, *args, **kwargs):
- return self._do_enter('call_at', *args, **kwargs)
- def enter_after(self, *args, **kwargs):
- return self._do_enter('enter_after', *args, **kwargs)
- def call_after(self, *args, **kwargs):
- return self._do_enter('call_after', *args, **kwargs)
- def call_repeatedly(self, *args, **kwargs):
- return self._do_enter('call_repeatedly', *args, **kwargs)
- def exit_after(self, secs, priority=10):
- self.call_after(secs, sys.exit, priority)
- def cancel(self, tref):
- tref.cancel()
- def clear(self):
- self.schedule.clear()
- def empty(self):
- return not len(self)
- def __len__(self):
- return len(self.schedule)
- def __nonzero__(self):
- return True
- @property
- def queue(self):
- return self.schedule.queue
|