123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- # -*- coding: utf-8 -*-
- """
- celery.events.cursesmon
- ~~~~~~~~~~~~~~~~~~~~~~~
- Graphical monitor of Celery events using curses.
- """
- from __future__ import absolute_import, print_function
- import curses
- import sys
- import threading
- import time
- from datetime import datetime
- from itertools import count
- from textwrap import wrap
- from math import ceil
- from celery import VERSION_BANNER
- from celery import states
- from celery.app import app_or_default
- from celery.five import items, values
- from celery.utils.text import abbr, abbrtask
- BORDER_SPACING = 4
- LEFT_BORDER_OFFSET = 3
- UUID_WIDTH = 36
- STATE_WIDTH = 8
- TIMESTAMP_WIDTH = 8
- MIN_WORKER_WIDTH = 15
- MIN_TASK_WIDTH = 16
- # this module is considered experimental
- # we don't care about coverage.
- STATUS_SCREEN = """\
- events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}
- """
- class CursesMonitor(object): # pragma: no cover
- keymap = {}
- win = None
- screen_width = None
- screen_delay = 10
- selected_task = None
- selected_position = 0
- selected_str = 'Selected: '
- foreground = curses.COLOR_BLACK
- background = curses.COLOR_WHITE
- online_str = 'Workers online: '
- help_title = 'Keys: '
- help = ('j:up k:down i:info t:traceback r:result c:revoke ^c: quit')
- greet = 'celery events {0}'.format(VERSION_BANNER)
- info_str = 'Info: '
- def __init__(self, state, keymap=None, app=None):
- self.app = app_or_default(app)
- self.keymap = keymap or self.keymap
- self.state = state
- default_keymap = {'J': self.move_selection_down,
- 'K': self.move_selection_up,
- 'C': self.revoke_selection,
- 'T': self.selection_traceback,
- 'R': self.selection_result,
- 'I': self.selection_info,
- 'L': self.selection_rate_limit}
- self.keymap = dict(default_keymap, **self.keymap)
- def format_row(self, uuid, task, worker, timestamp, state):
- mx = self.display_width
- # include spacing
- detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
- uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
- if uuid_space < UUID_WIDTH:
- uuid_width = uuid_space
- else:
- uuid_width = UUID_WIDTH
- detail_width = detail_width - uuid_width - 1
- task_width = int(ceil(detail_width / 2.0))
- worker_width = detail_width - task_width - 1
- uuid = abbr(uuid, uuid_width).ljust(uuid_width)
- worker = abbr(worker, worker_width).ljust(worker_width)
- task = abbrtask(task, task_width).ljust(task_width)
- state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
- timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
- row = '{0} {1} {2} {3} {4} '.format(uuid, worker, task,
- timestamp, state)
- if self.screen_width is None:
- self.screen_width = len(row[:mx])
- return row[:mx]
- @property
- def screen_width(self):
- _, mx = self.win.getmaxyx()
- return mx
- @property
- def screen_height(self):
- my, _ = self.win.getmaxyx()
- return my
- @property
- def display_width(self):
- _, mx = self.win.getmaxyx()
- return mx - BORDER_SPACING
- @property
- def display_height(self):
- my, _ = self.win.getmaxyx()
- return my - 10
- @property
- def limit(self):
- return self.display_height
- def find_position(self):
- if not self.tasks:
- return 0
- for i, e in enumerate(self.tasks):
- if self.selected_task == e[0]:
- return i
- return 0
- def move_selection_up(self):
- self.move_selection(-1)
- def move_selection_down(self):
- self.move_selection(1)
- def move_selection(self, direction=1):
- if not self.tasks:
- return
- pos = self.find_position()
- try:
- self.selected_task = self.tasks[pos + direction][0]
- except IndexError:
- self.selected_task = self.tasks[0][0]
- keyalias = {curses.KEY_DOWN: 'J',
- curses.KEY_UP: 'K',
- curses.KEY_ENTER: 'I'}
- def handle_keypress(self):
- try:
- key = self.win.getkey().upper()
- except:
- return
- key = self.keyalias.get(key) or key
- handler = self.keymap.get(key)
- if handler is not None:
- handler()
- def alert(self, callback, title=None):
- self.win.erase()
- my, mx = self.win.getmaxyx()
- y = blank_line = count(2)
- if title:
- self.win.addstr(next(y), 3, title,
- curses.A_BOLD | curses.A_UNDERLINE)
- next(blank_line)
- callback(my, mx, next(y))
- self.win.addstr(my - 1, 0, 'Press any key to continue...',
- curses.A_BOLD)
- self.win.refresh()
- while 1:
- try:
- return self.win.getkey().upper()
- except:
- pass
- def selection_rate_limit(self):
- if not self.selected_task:
- return curses.beep()
- task = self.state.tasks[self.selected_task]
- if not task.name:
- return curses.beep()
- my, mx = self.win.getmaxyx()
- r = 'New rate limit: '
- self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
- self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))
- rlimit = self.readline(my - 2, 3 + len(r))
- if rlimit:
- reply = self.app.control.rate_limit(task.name,
- rlimit.strip(), reply=True)
- self.alert_remote_control_reply(reply)
- def alert_remote_control_reply(self, reply):
- def callback(my, mx, xs):
- y = count(xs)
- if not reply:
- self.win.addstr(next(y), 3,
- 'No replies received in 1s deadline.',
- curses.A_BOLD + curses.color_pair(2))
- return
- for subreply in reply:
- curline = next(y)
- host, response = next(items(subreply))
- host = '{0}: '.format(host)
- self.win.addstr(curline, 3, host, curses.A_BOLD)
- attr = curses.A_NORMAL
- text = ''
- if 'error' in response:
- text = response['error']
- attr |= curses.color_pair(2)
- elif 'ok' in response:
- text = response['ok']
- attr |= curses.color_pair(3)
- self.win.addstr(curline, 3 + len(host), text, attr)
- return self.alert(callback, 'Remote Control Command Replies')
- def readline(self, x, y):
- buffer = str()
- curses.echo()
- try:
- i = 0
- while 1:
- ch = self.win.getch(x, y + i)
- if ch != -1:
- if ch in (10, curses.KEY_ENTER): # enter
- break
- if ch in (27, ):
- buffer = str()
- break
- buffer += chr(ch)
- i += 1
- finally:
- curses.noecho()
- return buffer
- def revoke_selection(self):
- if not self.selected_task:
- return curses.beep()
- reply = self.app.control.revoke(self.selected_task, reply=True)
- self.alert_remote_control_reply(reply)
- def selection_info(self):
- if not self.selected_task:
- return
- def alert_callback(mx, my, xs):
- my, mx = self.win.getmaxyx()
- y = count(xs)
- task = self.state.tasks[self.selected_task]
- info = task.info(extra=['state'])
- infoitems = [('args', info.pop('args', None)),
- ('kwargs', info.pop('kwargs', None))
- ] + list(info.items())
- for key, value in infoitems:
- if key is None:
- continue
- value = str(value)
- curline = next(y)
- keys = key + ': '
- self.win.addstr(curline, 3, keys, curses.A_BOLD)
- wrapped = wrap(value, mx - 2)
- if len(wrapped) == 1:
- self.win.addstr(curline, len(keys) + 3,
- abbr(wrapped[0],
- self.screen_width - (len(keys) + 3)))
- else:
- for subline in wrapped:
- nexty = next(y)
- if nexty >= my - 1:
- subline = ' ' * 4 + '[...]'
- elif nexty >= my:
- break
- self.win.addstr(nexty, 3,
- abbr(' ' * 4 + subline, self.screen_width - 4),
- curses.A_NORMAL)
- return self.alert(alert_callback,
- 'Task details for {0.selected_task}'.format(self))
- def selection_traceback(self):
- if not self.selected_task:
- return curses.beep()
- task = self.state.tasks[self.selected_task]
- if task.state not in states.EXCEPTION_STATES:
- return curses.beep()
- def alert_callback(my, mx, xs):
- y = count(xs)
- for line in task.traceback.split('\n'):
- self.win.addstr(next(y), 3, line)
- return self.alert(alert_callback,
- 'Task Exception Traceback for {0.selected_task}'.format(self))
- def selection_result(self):
- if not self.selected_task:
- return
- def alert_callback(my, mx, xs):
- y = count(xs)
- task = self.state.tasks[self.selected_task]
- result = getattr(task, 'result', None) or getattr(task,
- 'exception', None)
- for line in wrap(result, mx - 2):
- self.win.addstr(next(y), 3, line)
- return self.alert(alert_callback,
- 'Task Result for {0.selected_task}'.format(self))
- def display_task_row(self, lineno, task):
- state_color = self.state_colors.get(task.state)
- attr = curses.A_NORMAL
- if task.uuid == self.selected_task:
- attr = curses.A_STANDOUT
- timestamp = datetime.utcfromtimestamp(
- task.timestamp or time.time())
- timef = timestamp.strftime('%H:%M:%S')
- hostname = task.worker.hostname if task.worker else '*NONE*'
- line = self.format_row(task.uuid, task.name,
- hostname,
- timef, task.state)
- self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
- if state_color:
- self.win.addstr(lineno,
- len(line) - STATE_WIDTH + BORDER_SPACING - 1,
- task.state, state_color | attr)
- def draw(self):
- win = self.win
- self.handle_keypress()
- x = LEFT_BORDER_OFFSET
- y = blank_line = count(2)
- my, mx = win.getmaxyx()
- win.erase()
- win.bkgd(' ', curses.color_pair(1))
- win.border()
- win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
- next(blank_line)
- win.addstr(next(y), x, self.format_row('UUID', 'TASK',
- 'WORKER', 'TIME', 'STATE'),
- curses.A_BOLD | curses.A_UNDERLINE)
- tasks = self.tasks
- if tasks:
- for row, (uuid, task) in enumerate(tasks):
- if row > self.display_height:
- break
- if task.uuid:
- lineno = next(y)
- self.display_task_row(lineno, task)
- # -- Footer
- next(blank_line)
- win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
- # Selected Task Info
- if self.selected_task:
- win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
- info = 'Missing extended info'
- detail = ''
- try:
- selection = self.state.tasks[self.selected_task]
- except KeyError:
- pass
- else:
- info = selection.info()
- if 'runtime' in info:
- info['runtime'] = '{0:.2fs}'.format(info['runtime'])
- if 'result' in info:
- info['result'] = abbr(info['result'], 16)
- info = ' '.join('{0}={1}'.format(key, value)
- for key, value in items(info))
- detail = '... -> key i'
- infowin = abbr(info,
- self.screen_width - len(self.selected_str) - 2,
- detail)
- win.addstr(my - 5, x + len(self.selected_str), infowin)
- # Make ellipsis bold
- if detail in infowin:
- detailpos = len(infowin) - len(detail)
- win.addstr(my - 5, x + len(self.selected_str) + detailpos,
- detail, curses.A_BOLD)
- else:
- win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL)
- # Workers
- if self.workers:
- win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
- win.addstr(my - 4, x + len(self.online_str),
- ', '.join(sorted(self.workers)), curses.A_NORMAL)
- else:
- win.addstr(my - 4, x, 'No workers discovered.')
- # Info
- win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
- win.addstr(my - 3, x + len(self.info_str),
- STATUS_SCREEN.format(s=self.state,
- w_alive=len([w for w in values(self.state.workers)
- if w.alive]),
- w_all=len(self.state.workers)),
- curses.A_DIM)
- # Help
- self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
- self.safe_add_str(my - 2, x + len(self.help_title), self.help,
- curses.A_DIM)
- win.refresh()
- def safe_add_str(self, y, x, string, *args, **kwargs):
- if x + len(string) > self.screen_width:
- string = string[:self.screen_width - x]
- self.win.addstr(y, x, string, *args, **kwargs)
- def init_screen(self):
- self.win = curses.initscr()
- self.win.nodelay(True)
- self.win.keypad(True)
- curses.start_color()
- curses.init_pair(1, self.foreground, self.background)
- # exception states
- curses.init_pair(2, curses.COLOR_RED, self.background)
- # successful state
- curses.init_pair(3, curses.COLOR_GREEN, self.background)
- # revoked state
- curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
- # greeting
- curses.init_pair(5, curses.COLOR_BLUE, self.background)
- # started state
- curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
- self.state_colors = {states.SUCCESS: curses.color_pair(3),
- states.REVOKED: curses.color_pair(4),
- states.STARTED: curses.color_pair(6)}
- for state in states.EXCEPTION_STATES:
- self.state_colors[state] = curses.color_pair(2)
- curses.cbreak()
- def resetscreen(self):
- curses.nocbreak()
- self.win.keypad(False)
- curses.echo()
- curses.endwin()
- def nap(self):
- curses.napms(self.screen_delay)
- @property
- def tasks(self):
- return list(self.state.tasks_by_time(limit=self.limit))
- @property
- def workers(self):
- return [hostname
- for hostname, w in items(self.state.workers)
- if w.alive]
- class DisplayThread(threading.Thread): # pragma: no cover
- def __init__(self, display):
- self.display = display
- self.shutdown = False
- threading.Thread.__init__(self)
- def run(self):
- while not self.shutdown:
- self.display.draw()
- self.display.nap()
- def capture_events(app, state, display): # pragma: no cover
- def on_connection_error(exc, interval):
- print('Connection Error: {0!r}. Retry in {1}s.'.format(
- exc, interval), file=sys.stderr)
- while 1:
- print('-> evtop: starting capture...', file=sys.stderr)
- with app.connection() as conn:
- try:
- conn.ensure_connection(on_connection_error,
- app.conf.BROKER_CONNECTION_MAX_RETRIES)
- recv = app.events.Receiver(conn, handlers={'*': state.event})
- display.resetscreen()
- display.init_screen()
- recv.capture()
- except conn.connection_errors + conn.channel_errors as exc:
- print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
- def evtop(app=None): # pragma: no cover
- app = app_or_default(app)
- state = app.events.State()
- display = CursesMonitor(state, app=app)
- display.init_screen()
- refresher = DisplayThread(display)
- refresher.start()
- try:
- capture_events(app, state, display)
- except Exception:
- refresher.shutdown = True
- refresher.join()
- display.resetscreen()
- raise
- except (KeyboardInterrupt, SystemExit):
- refresher.shutdown = True
- refresher.join()
- display.resetscreen()
- if __name__ == '__main__': # pragma: no cover
- evtop()
|