| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 | # -*- coding: utf-8 -*-"""    celery.events.cursesmon    ~~~~~~~~~~~~~~~~~~~~~~~    Graphical monitor of Celery events using curses."""from __future__ import absolute_import, print_functionimport cursesimport sysimport threadingimport timefrom datetime import datetimefrom itertools import countfrom textwrap import wrapfrom math import ceilfrom celery import VERSION_BANNERfrom celery import statesfrom celery.app import app_or_defaultfrom celery.five import items, valuesfrom celery.utils.text import abbr, abbrtask__all__ = ['CursesMonitor', 'evtop']BORDER_SPACING = 4LEFT_BORDER_OFFSET = 3UUID_WIDTH = 36STATE_WIDTH = 8TIMESTAMP_WIDTH = 8MIN_WORKER_WIDTH = 15MIN_TASK_WIDTH = 16# this module is considered experimental# we don't care about coverage.STATUS_SCREEN = """\events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}"""class CursesMonitor(object):  # pragma: no cover    keymap = {}    win = None    screen_width = None    screen_delay = 10    selected_task = None    selected_position = 0    selected_str = 'Selected: '    foreground = curses.COLOR_BLACK    background = curses.COLOR_WHITE    online_str = 'Workers online: '    help_title = 'Keys: '    help = ('j:down k:up i:info t:traceback r:result c:revoke ^c: quit')    greet = 'celery events {0}'.format(VERSION_BANNER)    info_str = 'Info: '    def __init__(self, state, app, keymap=None):        self.app = app        self.keymap = keymap or self.keymap        self.state = state        default_keymap = {'J': self.move_selection_down,                          'K': self.move_selection_up,                          'C': self.revoke_selection,                          'T': self.selection_traceback,                          'R': self.selection_result,                          'I': self.selection_info,                          'L': self.selection_rate_limit}        self.keymap = dict(default_keymap, **self.keymap)        self.lock = threading.RLock()    def format_row(self, uuid, task, worker, timestamp, state):        mx = self.display_width        # include spacing        detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH        uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH        if uuid_space < UUID_WIDTH:            uuid_width = uuid_space        else:            uuid_width = UUID_WIDTH        detail_width = detail_width - uuid_width - 1        task_width = int(ceil(detail_width / 2.0))        worker_width = detail_width - task_width - 1        uuid = abbr(uuid, uuid_width).ljust(uuid_width)        worker = abbr(worker, worker_width).ljust(worker_width)        task = abbrtask(task, task_width).ljust(task_width)        state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)        timestamp = timestamp.ljust(TIMESTAMP_WIDTH)        row = '{0} {1} {2} {3} {4} '.format(uuid, worker, task,                                            timestamp, state)        if self.screen_width is None:            self.screen_width = len(row[:mx])        return row[:mx]    @property    def screen_width(self):        _, mx = self.win.getmaxyx()        return mx    @property    def screen_height(self):        my, _ = self.win.getmaxyx()        return my    @property    def display_width(self):        _, mx = self.win.getmaxyx()        return mx - BORDER_SPACING    @property    def display_height(self):        my, _ = self.win.getmaxyx()        return my - 10    @property    def limit(self):        return self.display_height    def find_position(self):        if not self.tasks:            return 0        for i, e in enumerate(self.tasks):            if self.selected_task == e[0]:                return i        return 0    def move_selection_up(self):        self.move_selection(-1)    def move_selection_down(self):        self.move_selection(1)    def move_selection(self, direction=1):        if not self.tasks:            return        pos = self.find_position()        try:            self.selected_task = self.tasks[pos + direction][0]        except IndexError:            self.selected_task = self.tasks[0][0]    keyalias = {curses.KEY_DOWN: 'J',                curses.KEY_UP: 'K',                curses.KEY_ENTER: 'I'}    def handle_keypress(self):        try:            key = self.win.getkey().upper()        except:            return        key = self.keyalias.get(key) or key        handler = self.keymap.get(key)        if handler is not None:            handler()    def alert(self, callback, title=None):        self.win.erase()        my, mx = self.win.getmaxyx()        y = blank_line = count(2)        if title:            self.win.addstr(next(y), 3, title,                            curses.A_BOLD | curses.A_UNDERLINE)            next(blank_line)        callback(my, mx, next(y))        self.win.addstr(my - 1, 0, 'Press any key to continue...',                        curses.A_BOLD)        self.win.refresh()        while 1:            try:                return self.win.getkey().upper()            except:                pass    def selection_rate_limit(self):        if not self.selected_task:            return curses.beep()        task = self.state.tasks[self.selected_task]        if not task.name:            return curses.beep()        my, mx = self.win.getmaxyx()        r = 'New rate limit: '        self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)        self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))        rlimit = self.readline(my - 2, 3 + len(r))        if rlimit:            reply = self.app.control.rate_limit(task.name,                                                rlimit.strip(), reply=True)            self.alert_remote_control_reply(reply)    def alert_remote_control_reply(self, reply):        def callback(my, mx, xs):            y = count(xs)            if not reply:                self.win.addstr(                    next(y), 3, 'No replies received in 1s deadline.',                    curses.A_BOLD + curses.color_pair(2),                )                return            for subreply in reply:                curline = next(y)                host, response = next(items(subreply))                host = '{0}: '.format(host)                self.win.addstr(curline, 3, host, curses.A_BOLD)                attr = curses.A_NORMAL                text = ''                if 'error' in response:                    text = response['error']                    attr |= curses.color_pair(2)                elif 'ok' in response:                    text = response['ok']                    attr |= curses.color_pair(3)                self.win.addstr(curline, 3 + len(host), text, attr)        return self.alert(callback, 'Remote Control Command Replies')    def readline(self, x, y):        buffer = str()        curses.echo()        try:            i = 0            while 1:                ch = self.win.getch(x, y + i)                if ch != -1:                    if ch in (10, curses.KEY_ENTER):            # enter                        break                    if ch in (27, ):                        buffer = str()                        break                    buffer += chr(ch)                    i += 1        finally:            curses.noecho()        return buffer    def revoke_selection(self):        if not self.selected_task:            return curses.beep()        reply = self.app.control.revoke(self.selected_task, reply=True)        self.alert_remote_control_reply(reply)    def selection_info(self):        if not self.selected_task:            return        def alert_callback(mx, my, xs):            my, mx = self.win.getmaxyx()            y = count(xs)            task = self.state.tasks[self.selected_task]            info = task.info(extra=['state'])            infoitems = [                ('args', info.pop('args', None)),                ('kwargs', info.pop('kwargs', None))            ] + list(info.items())            for key, value in infoitems:                if key is None:                    continue                value = str(value)                curline = next(y)                keys = key + ': '                self.win.addstr(curline, 3, keys, curses.A_BOLD)                wrapped = wrap(value, mx - 2)                if len(wrapped) == 1:                    self.win.addstr(                        curline, len(keys) + 3,                        abbr(wrapped[0],                             self.screen_width - (len(keys) + 3)))                else:                    for subline in wrapped:                        nexty = next(y)                        if nexty >= my - 1:                            subline = ' ' * 4 + '[...]'                        elif nexty >= my:                            break                        self.win.addstr(                            nexty, 3,                            abbr(' ' * 4 + subline, self.screen_width - 4),                            curses.A_NORMAL,                        )        return self.alert(            alert_callback, 'Task details for {0.selected_task}'.format(self),        )    def selection_traceback(self):        if not self.selected_task:            return curses.beep()        task = self.state.tasks[self.selected_task]        if task.state not in states.EXCEPTION_STATES:            return curses.beep()        def alert_callback(my, mx, xs):            y = count(xs)            for line in task.traceback.split('\n'):                self.win.addstr(next(y), 3, line)        return self.alert(            alert_callback,            'Task Exception Traceback for {0.selected_task}'.format(self),        )    def selection_result(self):        if not self.selected_task:            return        def alert_callback(my, mx, xs):            y = count(xs)            task = self.state.tasks[self.selected_task]            result = (getattr(task, 'result', None)                      or getattr(task, 'exception', None))            for line in wrap(result, mx - 2):                self.win.addstr(next(y), 3, line)        return self.alert(            alert_callback,            'Task Result for {0.selected_task}'.format(self),        )    def display_task_row(self, lineno, task):        state_color = self.state_colors.get(task.state)        attr = curses.A_NORMAL        if task.uuid == self.selected_task:            attr = curses.A_STANDOUT        timestamp = datetime.utcfromtimestamp(            task.timestamp or time.time(),        )        timef = timestamp.strftime('%H:%M:%S')        hostname = task.worker.hostname if task.worker else '*NONE*'        line = self.format_row(task.uuid, task.name,                               hostname,                               timef, task.state)        self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)        if state_color:            self.win.addstr(lineno,                            len(line) - STATE_WIDTH + BORDER_SPACING - 1,                            task.state, state_color | attr)    def draw(self):        with self.lock:            win = self.win            self.handle_keypress()            x = LEFT_BORDER_OFFSET            y = blank_line = count(2)            my, mx = win.getmaxyx()            win.erase()            win.bkgd(' ', curses.color_pair(1))            win.border()            win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))            next(blank_line)            win.addstr(next(y), x, self.format_row('UUID', 'TASK',                                                   'WORKER', 'TIME', 'STATE'),                       curses.A_BOLD | curses.A_UNDERLINE)            tasks = self.tasks            if tasks:                for row, (uuid, task) in enumerate(tasks):                    if row > self.display_height:                        break                    if task.uuid:                        lineno = next(y)                    self.display_task_row(lineno, task)            # -- Footer            next(blank_line)            win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)            # Selected Task Info            if self.selected_task:                win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)                info = 'Missing extended info'                detail = ''                try:                    selection = self.state.tasks[self.selected_task]                except KeyError:                    pass                else:                    info = selection.info()                    if 'runtime' in info:                        info['runtime'] = '{0:.2f}'.format(info['runtime'])                    if 'result' in info:                        info['result'] = abbr(info['result'], 16)                    info = ' '.join(                        '{0}={1}'.format(key, value)                        for key, value in items(info)                    )                    detail = '... -> key i'                infowin = abbr(info,                               self.screen_width - len(self.selected_str) - 2,                               detail)                win.addstr(my - 5, x + len(self.selected_str), infowin)                # Make ellipsis bold                if detail in infowin:                    detailpos = len(infowin) - len(detail)                    win.addstr(my - 5, x + len(self.selected_str) + detailpos,                               detail, curses.A_BOLD)            else:                win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL)            # Workers            if self.workers:                win.addstr(my - 4, x, self.online_str, curses.A_BOLD)                win.addstr(my - 4, x + len(self.online_str),                           ', '.join(sorted(self.workers)), curses.A_NORMAL)            else:                win.addstr(my - 4, x, 'No workers discovered.')            # Info            win.addstr(my - 3, x, self.info_str, curses.A_BOLD)            win.addstr(                my - 3, x + len(self.info_str),                STATUS_SCREEN.format(                    s=self.state,                    w_alive=len([w for w in values(self.state.workers)                                if w.alive]),                    w_all=len(self.state.workers),                ),                curses.A_DIM,            )            # Help            self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)            self.safe_add_str(my - 2, x + len(self.help_title), self.help,                              curses.A_DIM)            win.refresh()    def safe_add_str(self, y, x, string, *args, **kwargs):        if x + len(string) > self.screen_width:            string = string[:self.screen_width - x]        self.win.addstr(y, x, string, *args, **kwargs)    def init_screen(self):        with self.lock:            self.win = curses.initscr()            self.win.nodelay(True)            self.win.keypad(True)            curses.start_color()            curses.init_pair(1, self.foreground, self.background)            # exception states            curses.init_pair(2, curses.COLOR_RED, self.background)            # successful state            curses.init_pair(3, curses.COLOR_GREEN, self.background)            # revoked state            curses.init_pair(4, curses.COLOR_MAGENTA, self.background)            # greeting            curses.init_pair(5, curses.COLOR_BLUE, self.background)            # started state            curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)            self.state_colors = {states.SUCCESS: curses.color_pair(3),                                 states.REVOKED: curses.color_pair(4),                                 states.STARTED: curses.color_pair(6)}            for state in states.EXCEPTION_STATES:                self.state_colors[state] = curses.color_pair(2)            curses.cbreak()    def resetscreen(self):        with self.lock:            curses.nocbreak()            self.win.keypad(False)            curses.echo()            curses.endwin()    def nap(self):        curses.napms(self.screen_delay)    @property    def tasks(self):        return list(self.state.tasks_by_time(limit=self.limit))    @property    def workers(self):        return [hostname for hostname, w in items(self.state.workers)                if w.alive]class DisplayThread(threading.Thread):  # pragma: no cover    def __init__(self, display):        self.display = display        self.shutdown = False        threading.Thread.__init__(self)    def run(self):        while not self.shutdown:            self.display.draw()            self.display.nap()def capture_events(app, state, display):  # pragma: no cover    def on_connection_error(exc, interval):        print('Connection Error: {0!r}. Retry in {1}s.'.format(            exc, interval), file=sys.stderr)    while 1:        print('-> evtop: starting capture...', file=sys.stderr)        with app.connection() as conn:            try:                conn.ensure_connection(on_connection_error,                                       app.conf.BROKER_CONNECTION_MAX_RETRIES)                recv = app.events.Receiver(conn, handlers={'*': state.event})                display.resetscreen()                display.init_screen()                recv.capture()            except conn.connection_errors + conn.channel_errors as exc:                print('Connection lost: {0!r}'.format(exc), file=sys.stderr)def evtop(app=None):  # pragma: no cover    app = app_or_default(app)    state = app.events.State()    display = CursesMonitor(state, app)    display.init_screen()    refresher = DisplayThread(display)    refresher.start()    try:        capture_events(app, state, display)    except Exception:        refresher.shutdown = True        refresher.join()        display.resetscreen()        raise    except (KeyboardInterrupt, SystemExit):        refresher.shutdown = True        refresher.join()        display.resetscreen()if __name__ == '__main__':  # pragma: no cover    evtop()
 |