Pārlūkot izejas kodu

celeryev: Curses tool now a *lot* better :)

Ask Solem 15 gadi atpakaļ
vecāks
revīzija
4efa415b11
3 mainītis faili ar 198 papildinājumiem un 41 dzēšanām
  1. 180 36
      celery/bin/celeryev.py
  2. 6 2
      celery/events/__init__.py
  3. 12 3
      celery/events/state.py

+ 180 - 36
celery/bin/celeryev.py

@@ -1,5 +1,9 @@
 import sys
+import time
 import curses
+import atexit
+import socket
+import threading
 
 from datetime import datetime
 from itertools import count
@@ -60,49 +64,189 @@ def eventdump():
         conn and conn.close()
 
 
-def main():
-    sys.stderr.write("-> celeryev: starting capture...\n")
-    stdscr = curses.initscr()
-    curses.cbreak()
-
-    i = count(0).next
-    my, mx = stdscr.getmaxyx()
-    def callback(state, event):
-        workers = []
-        for worker in state.workers.values():
-            if worker.hostname:
-                workers.append("%s %s" % (worker.hostname,
-                    worker.alive and "online" or "offline"))
-        tasks = []
-        for uuid, task in state.tasks_by_timestamp():
-            if task.uuid and not task.visited:
-                tasks.append("%s %s %s" % (task.uuid,
-                    task.name, task.state))
+
+def abbr(S, max, dots=True):
+    if S is None:
+        return "???"
+    if len(S) > max:
+        return dots and S[:max-3] + "..." or S[:max-3]
+    return S
+
+
+def abbrtask(S, max):
+    if S is None:
+        return "???"
+    if len(S) > max:
+        module, _, cls = rpartition(S, ".")
+        module = abbr(module, max - len(cls), False)
+        return module + "[.]" + cls
+    return S
+
+
+class CursesMonitor(object):
+    screen_width = None
+    selected_task = None
+    selected_position = 0
+    selected_str = "Selected: "
+    limit = 20
+    foreground = curses.COLOR_BLACK
+    background = curses.COLOR_WHITE
+    online_str = "Workers online: "
+
+    def __init__(self, state):
+        self.state = state
+
+    def format_row(self, uuid, task, worker, state):
+        uuid = uuid.ljust(36)
+        worker = abbr(worker, 16).ljust(16)
+        task = abbrtask(task, 20).ljust(30)
+        state = abbr(state, 8).ljust(8)
+        row = " %s %s %s %s " % (uuid, worker, task, state)
+        if self.screen_width is None:
+            self.screen_width = len(row)
+        return row
+
+    def find_position(self):
+        if not self.tasks:
+            return 0
+        for i, e in enumerate(self.tasks):
+            if self.selected_task == e[0]:
+                return i
+        return 0
+
+    def move_selection(self, reverse=False):
+        if not self.tasks:
+            return
+        incr = reverse and -1 or 1
+        pos = self.find_position() + incr
+        try:
+            self.selected_task = self.tasks[pos][0]
+        except IndexError:
+            self.selected_task = self.tasks[0][0]
+
+    def handle_keypress(self):
+        try:
+            key = self.win.getkey().upper()
+        except:
+            return
+        if key in ("KEY_DOWN", "J"):
+            self.move_selection()
+        elif key in ("KEY_UP", "K"):
+            self.move_selection(reverse=True)
+        elif key in ("Q", ):
+            raise KeyboardInterrupt
+
+    def draw(self):
+        self.handle_keypress()
+        win = self.win
+        x = 3
+        y = blank_line = count(2).next
+        my, mx = win.getmaxyx()
+        win.erase()
+        win.bkgd(" ", curses.color_pair(1))
+        win.border()
+        win.addstr(y(), x, self.format_row("UUID", "TASK", "WORKER", "STATE"),
+                curses.A_STANDOUT)
+        for uuid, task in self.tasks:
+            if task.uuid:
+                attr = curses.A_NORMAL
+                if task.uuid == self.selected_task:
+                    attr = curses.A_STANDOUT
+                win.addstr(y(), x, self.format_row(uuid,
+                    task.name, task.worker.hostname, task.state),
+                    attr)
                 if task.ready:
-                    task.visited = True
-        for i, line in enumerate(workers):
-            stdscr.addstr(i, 0, line, curses.A_REVERSE)
-            stdscr.addstr(i, len(line), " " * (my - len(line)),
-                    curses.A_DIM)
-        stdscr.addstr(i + 1, 0, " " * 32, curses.A_UNDERLINE)
-        stdscr.addstr(i + 2, 0, " " * 32, curses.A_DIM)
-        for j, line in enumerate(tasks):
-            line = line + "    "
-            line = line + " " * (my - len(line))
-            stdscr.addstr(j + i + 3, 0, line, curses.A_DIM)
-            stdscr.addstr(i, len(line), " " * (my - len(line)),
-                    curses.A_DIM)
-        stdscr.refresh()
+                    task.visited = time.time()
+
+        if self.selected_task:
+            win.addstr(my - 3, x, self.selected_str, curses.A_BOLD)
+            info = "Missing extended info"
+            try:
+                selection = self.state.tasks[self.selected_task]
+            except KeyError:
+                pass
+            else:
+                info = selection.info
+                if "runtime" in info:
+                    info["runtime"] = "%.2fs" % info["runtime"]
+                info = " ".join("%s=%s" % (key, value)
+                            for key, value in info.items())
+            win.addstr(my - 3, x + len(self.selected_str), info)
+
+        else:
+            win.addstr(my - 3, x, "No task selected", curses.A_NORMAL)
+        if self.workers:
+            win.addstr(my - 2, x, self.online_str, curses.A_BOLD)
+            win.addstr(my - 2, x + len(self.online_str),
+                    ", ".join(self.workers), curses.A_NORMAL)
+        else:
+            win.addstr(my - 2, x, "No workers discovered.")
+        win.refresh()
+
+    def setupscreen(self):
+        self.win = curses.initscr()
+        curses.start_color()
+        curses.init_pair(1, self.foreground, self.background)
+        curses.cbreak()
+        self.win.nodelay(True)
+        self.win.keypad(True)
+
+    def resetscreen(self):
+        curses.nocbreak()
+        curses.echo()
+        curses.endwin()
+
+    @property
+    def tasks(self):
+        return self.state.tasks_by_timestamp()[:self.limit]
+
+    @property
+    def workers(self):
+        return [hostname
+                    for hostname, w in self.state.workers.items()
+                        if w.alive]
+
 
+class DisplayThread(threading.Thread):
+
+    def __init__(self, display):
+        self.display = display
+        self.shutdown = False
+        threading.Thread.__init__(self)
+
+    def run(self):
+        while not self.shutdown:
+            self.display.draw()
+
+def main():
+    sys.stderr.write("-> celeryev: starting capture...\n")
+    state = State()
+    display = CursesMonitor(state)
+    display.setupscreen()
+    refresher = DisplayThread(display)
+    refresher.start()
     conn = establish_connection()
-    state = State(callback)
     recv = EventReceiver(conn, handlers={"*": state.event})
     try:
-        recv.capture()
+        consumer = recv.consumer()
+        consumer.consume()
+        while True:
+            try:
+                conn.connection.drain_events()
+            except socket.timeout:
+                pass
+    except Exception, exc:
+        refresher.shutdown = True
+        refresher.join()
+        display.resetscreen()
+        raise
     except (KeyboardInterrupt, SystemExit):
         conn and conn.close()
-        curses.nocbreak()
-        curses.echo()
+        refresher.shutdown = True
+        refresher.join()
+        display.resetscreen()
+
+
 
 
 

+ 6 - 2
celery/events/__init__.py

@@ -92,6 +92,11 @@ class EventReceiver(object):
         handler = self.handlers.get(type) or self.handlers.get("*")
         handler and handler(event)
 
+    def consumer(self):
+        consumer = EventConsumer(self.connection)
+        consumer.register_callback(self._receive)
+        return consumer
+
     def capture(self, limit=None):
         """Open up a consumer capturing events.
 
@@ -99,8 +104,7 @@ class EventReceiver(object):
         stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
 
         """
-        consumer = EventConsumer(self.connection)
-        consumer.register_callback(self._receive)
+        consumer = self.consume()
         it = consumer.iterconsume(limit=limit)
         while True:
             it.next()

+ 12 - 3
celery/events/state.py

@@ -34,16 +34,24 @@ class Worker(Thing):
 
 
 class Task(Thing):
+    _info_fields = ("args", "kwargs", "retries", "result", "eta", "runtime")
     uuid = None
     name = None
     state = states.PENDING
     received = False
     accepted = False
-    args = '[]'
-    kwargs = '{}'
+    args = None
+    kwargs = None
     eta = None
     retries = 0
     worker = None
+    timestamp = None
+
+    @property
+    def info(self):
+        return dict((key, getattr(self, key, None))
+                        for key in self._info_fields
+                            if getattr(self, key, None) is not None)
 
     @property
     def ready(self):
@@ -130,6 +138,7 @@ class State(object):
             self.callback(self, event)
 
     def tasks_by_timestamp(self):
-        return sorted(self.tasks.items(), key=lambda t: t[1].timestamp)
+        return sorted(self.tasks.items(), key=lambda t: t[1].timestamp,
+                reverse=True)
 
 state = State()