123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- import celery
- import curses
- import sys
- import threading
- import time
- from datetime import datetime
- from itertools import count
- from textwrap import wrap
- from math import ceil
- from celery import states
- from celery.app import app_or_default
- from celery.utils import abbr, abbrtask
- BORDER_SPACING = 4
- LEFT_BORDER_OFFSET = 3
- UUID_WIDTH = 36
- STATE_WIDTH = 8
- TIMESTAMP_WIDTH = 8
- MIN_WORKER_WIDTH = 15
- MIN_TASK_WIDTH = 16
- class CursesMonitor(object):
- keymap = {}
- win = None
- screen_width = None
- screen_delay = 10
- selected_task = None
- selected_position = 0
- selected_str = "Selected: "
- foreground = curses.COLOR_BLACK
- background = curses.COLOR_WHITE
- online_str = "Workers online: "
- help_title = "Keys: "
- help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
- greet = "celeryev %s" % celery.__version__
- info_str = "Info: "
- def __init__(self, state, keymap=None, app=None):
- self.app = app_or_default(app)
- self.keymap = keymap or self.keymap
- self.state = state
- default_keymap = {"J": self.move_selection_down,
- "K": self.move_selection_up,
- "C": self.revoke_selection,
- "T": self.selection_traceback,
- "R": self.selection_result,
- "I": self.selection_info,
- "L": self.selection_rate_limit}
- self.keymap = dict(default_keymap, **self.keymap)
- def format_row(self, uuid, task, worker, timestamp, state):
- mx = self.display_width
- detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH # include spacing
- uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH # include spacing
- if uuid_space < UUID_WIDTH:
- uuid_width = uuid_space
- else:
- uuid_width = UUID_WIDTH
- detail_width = detail_width - uuid_width - 1
- task_width = int(ceil(detail_width / 2.0))
- worker_width = detail_width - task_width - 1
- uuid = abbr(uuid, uuid_width).ljust(uuid_width)
- worker = abbr(worker, worker_width).ljust(worker_width)
- task = abbrtask(task, task_width).ljust(task_width)
- state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
- timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
- row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
- if self.screen_width is None:
- self.screen_width = len(row[:mx])
- return row[:mx]
- @property
- def screen_width(self):
- _, mx = self.win.getmaxyx()
- return mx
- @property
- def screen_height(self):
- my, _ = self.win.getmaxyx()
- return my
- @property
- def display_width(self):
- _, mx = self.win.getmaxyx()
- return mx - BORDER_SPACING
- @property
- def display_height(self):
- my, _ = self.win.getmaxyx()
- return my - 10
- @property
- def limit(self):
- return self.display_height
- def find_position(self):
- if not self.tasks:
- return 0
- for i, e in enumerate(self.tasks):
- if self.selected_task == e[0]:
- return i
- return 0
- def move_selection_up(self):
- self.move_selection(-1)
- def move_selection_down(self):
- self.move_selection(1)
- def move_selection(self, direction=1):
- if not self.tasks:
- return
- pos = self.find_position()
- try:
- self.selected_task = self.tasks[pos + direction][0]
- except IndexError:
- self.selected_task = self.tasks[0][0]
- keyalias = {curses.KEY_DOWN: "J",
- curses.KEY_UP: "K",
- curses.KEY_ENTER: "I"}
- def handle_keypress(self):
- try:
- key = self.win.getkey().upper()
- except:
- return
- key = self.keyalias.get(key) or key
- handler = self.keymap.get(key)
- if handler is not None:
- handler()
- def alert(self, callback, title=None):
- self.win.erase()
- my, mx = self.win.getmaxyx()
- y = blank_line = count(2).next
- if title:
- self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
- blank_line()
- callback(my, mx, y())
- self.win.addstr(my - 1, 0, "Press any key to continue...",
- curses.A_BOLD)
- self.win.refresh()
- while 1:
- try:
- return self.win.getkey().upper()
- except:
- pass
- def selection_rate_limit(self):
- if not self.selected_task:
- return curses.beep()
- task = self.state.tasks[self.selected_task]
- if not task.name:
- return curses.beep()
- my, mx = self.win.getmaxyx()
- r = "New rate limit: "
- self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
- self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
- rlimit = self.readline(my - 2, 3 + len(r))
- if rlimit:
- reply = self.app.control.rate_limit(task.name,
- rlimit.strip(), reply=True)
- self.alert_remote_control_reply(reply)
- def alert_remote_control_reply(self, reply):
- def callback(my, mx, xs):
- y = count(xs).next
- if not reply:
- self.win.addstr(y(), 3, "No replies received in 1s deadline.",
- curses.A_BOLD + curses.color_pair(2))
- return
- for subreply in reply:
- curline = y()
- host, response = subreply.items()[0]
- host = "%s: " % host
- self.win.addstr(curline, 3, host, curses.A_BOLD)
- attr = curses.A_NORMAL
- text = ""
- if "error" in response:
- text = response["error"]
- attr |= curses.color_pair(2)
- elif "ok" in response:
- text = response["ok"]
- attr |= curses.color_pair(3)
- self.win.addstr(curline, 3 + len(host), text, attr)
- return self.alert(callback, "Remote Control Command Replies")
- def readline(self, x, y):
- buffer = str()
- curses.echo()
- try:
- i = 0
- while True:
- ch = self.win.getch(x, y + i)
- if ch != -1:
- if ch in (10, curses.KEY_ENTER): # enter
- break
- if ch in (27, ):
- buffer = str()
- break
- buffer += chr(ch)
- i += 1
- finally:
- curses.noecho()
- return buffer
- def revoke_selection(self):
- if not self.selected_task:
- return curses.beep()
- reply = self.app.control.revoke(self.selected_task, reply=True)
- self.alert_remote_control_reply(reply)
- def selection_info(self):
- if not self.selected_task:
- return
- def alert_callback(mx, my, xs):
- my, mx = self.win.getmaxyx()
- y = count(xs).next
- task = self.state.tasks[self.selected_task]
- info = task.info(extra=["state"])
- infoitems = [("args", info.pop("args", None)),
- ("kwargs", info.pop("kwargs", None))] + info.items()
- for key, value in infoitems:
- if key is None:
- continue
- value = str(value)
- curline = y()
- keys = key + ": "
- self.win.addstr(curline, 3, keys, curses.A_BOLD)
- wrapped = wrap(value, mx - 2)
- if len(wrapped) == 1:
- self.win.addstr(curline, len(keys) + 3,
- abbr(wrapped[0],
- self.screen_width - (len(keys) + 3)))
- else:
- for subline in wrapped:
- nexty = y()
- if nexty >= my - 1:
- subline = " " * 4 + "[...]"
- elif nexty >= my:
- break
- self.win.addstr(nexty, 3,
- abbr(" " * 4 + subline, self.screen_width - 4),
- curses.A_NORMAL)
- return self.alert(alert_callback,
- "Task details for %s" % self.selected_task)
- def selection_traceback(self):
- if not self.selected_task:
- return curses.beep()
- task = self.state.tasks[self.selected_task]
- if task.state not in states.EXCEPTION_STATES:
- return curses.beep()
- def alert_callback(my, mx, xs):
- y = count(xs).next
- for line in task.traceback.split("\n"):
- self.win.addstr(y(), 3, line)
- return self.alert(alert_callback,
- "Task Exception Traceback for %s" % self.selected_task)
- def selection_result(self):
- if not self.selected_task:
- return
- def alert_callback(my, mx, xs):
- y = count(xs).next
- task = self.state.tasks[self.selected_task]
- result = getattr(task, "result", None) or getattr(task,
- "exception", None)
- for line in wrap(result, mx - 2):
- self.win.addstr(y(), 3, line)
- return self.alert(alert_callback,
- "Task Result for %s" % self.selected_task)
- def display_task_row(self, lineno, task):
- state_color = self.state_colors.get(task.state)
- attr = curses.A_NORMAL
- if task.uuid== self.selected_task:
- attr = curses.A_STANDOUT
- timestamp = datetime.fromtimestamp(
- task.timestamp or time.time())
- timef = timestamp.strftime("%H:%M:%S")
- line = self.format_row(task.uuid, task.name,
- task.worker.hostname,
- timef, task.state)
- self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
- if state_color:
- self.win.addstr(lineno, len(line) - STATE_WIDTH + BORDER_SPACING - 1,
- task.state, state_color | attr)
- if task.ready:
- task.visited = time.time()
- def draw(self):
- win = self.win
- self.handle_keypress()
- x = LEFT_BORDER_OFFSET
- y = blank_line = count(2).next
- my, mx = win.getmaxyx()
- win.erase()
- win.bkgd(" ", curses.color_pair(1))
- win.border()
- win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
- blank_line()
- win.addstr(y(), x, self.format_row("UUID", "TASK",
- "WORKER", "TIME", "STATE"),
- curses.A_BOLD | curses.A_UNDERLINE)
- tasks = self.tasks
- if tasks:
- for row, (uuid, task) in enumerate(tasks):
- if row > self.display_height:
- break
- if task.uuid:
- lineno = y()
- self.display_task_row(lineno, task)
- # -- Footer
- blank_line()
- win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
- # Selected Task Info
- if self.selected_task:
- win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
- info = "Missing extended info"
- try:
- selection = self.state.tasks[self.selected_task]
- except KeyError:
- pass
- else:
- info = selection.info(["args", "kwargs",
- "result", "runtime", "eta"])
- if "runtime" in info:
- info["runtime"] = "%.2fs" % info["runtime"]
- if "result" in info:
- info["result"] = abbr(info["result"], 16)
- info = " ".join("%s=%s" % (key, value)
- for key, value in info.items())
- detail = "... -> key i"
- infowin = abbr(info,
- self.screen_width - len(self.selected_str) - 2,
- detail)
- win.addstr(my - 5, x + len(self.selected_str), infowin)
- # Make ellipsis bold
- if detail in infowin:
- detailpos = len(infowin) - len(detail)
- win.addstr(my - 5, x + len(self.selected_str) + detailpos,
- detail, curses.A_BOLD)
- else:
- win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
- # Workers
- if self.workers:
- win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
- win.addstr(my - 4, x + len(self.online_str),
- ", ".join(sorted(self.workers)), curses.A_NORMAL)
- else:
- win.addstr(my - 4, x, "No workers discovered.")
- # Info
- win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
- win.addstr(my - 3, x + len(self.info_str),
- "events:%s tasks:%s workers:%s/%s" % (
- self.state.event_count, self.state.task_count,
- len([w for w in self.state.workers.values()
- if w.alive]),
- len(self.state.workers)),
- curses.A_DIM)
- # Help
- self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
- self.safe_add_str(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
- win.refresh()
- def safe_add_str(self, y, x, string, *args, **kwargs):
- if x + len(string) > self.screen_width:
- string = string[:self.screen_width - x]
- self.win.addstr(y, x, string, *args, **kwargs)
- def init_screen(self):
- self.win = curses.initscr()
- self.win.nodelay(True)
- self.win.keypad(True)
- curses.start_color()
- curses.init_pair(1, self.foreground, self.background)
- # exception states
- curses.init_pair(2, curses.COLOR_RED, self.background)
- # successful state
- curses.init_pair(3, curses.COLOR_GREEN, self.background)
- # revoked state
- curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
- # greeting
- curses.init_pair(5, curses.COLOR_BLUE, self.background)
- # started state
- curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
- self.state_colors = {states.SUCCESS: curses.color_pair(3),
- states.REVOKED: curses.color_pair(4),
- states.STARTED: curses.color_pair(6)}
- for state in states.EXCEPTION_STATES:
- self.state_colors[state] = curses.color_pair(2)
- curses.cbreak()
- def resetscreen(self):
- curses.nocbreak()
- self.win.keypad(False)
- curses.echo()
- curses.endwin()
- def nap(self):
- curses.napms(self.screen_delay)
- @property
- def tasks(self):
- return self.state.tasks_by_timestamp()[:self.limit]
- @property
- def workers(self):
- return [hostname
- for hostname, w in self.state.workers.items()
- if w.alive]
- class DisplayThread(threading.Thread):
- def __init__(self, display):
- self.display = display
- self.shutdown = False
- threading.Thread.__init__(self)
- def run(self):
- while not self.shutdown:
- self.display.draw()
- self.display.nap()
- def evtop(app=None):
- sys.stderr.write("-> evtop: starting capture...\n")
- app = app_or_default(app)
- state = app.events.State()
- conn = app.broker_connection()
- recv = app.events.Receiver(conn, handlers={"*": state.event})
- capture = recv.itercapture()
- consumer = capture.next()
- display = CursesMonitor(state, app=app)
- display.init_screen()
- refresher = DisplayThread(display)
- refresher.start()
- try:
- capture.next()
- except Exception:
- refresher.shutdown = True
- refresher.join()
- display.resetscreen()
- raise
- except (KeyboardInterrupt, SystemExit):
- conn and conn.close()
- refresher.shutdown = True
- refresher.join()
- display.resetscreen()
- if __name__ == "__main__":
- evtop()
|