Pārlūkot izejas kodu

More awesome improvements to celeryev

Ask Solem 15 gadi atpakaļ
vecāks
revīzija
ba94fa0e6f
2 mainītis faili ar 99 papildinājumiem un 34 dzēšanām
  1. 91 33
      celery/bin/celeryev.py
  2. 8 1
      celery/events/state.py

+ 91 - 33
celery/bin/celeryev.py

@@ -9,6 +9,8 @@ import threading
 from datetime import datetime
 from itertools import count
 
+import celery
+from celery import states
 from celery.task import control
 from celery.events import EventReceiver
 from celery.events.state import State
@@ -84,6 +86,7 @@ def abbrtask(S, max):
 
 
 class CursesMonitor(object):
+    win = None
     screen_width = None
     selected_task = None
     selected_position = 0
@@ -92,21 +95,29 @@ class CursesMonitor(object):
     foreground = curses.COLOR_BLACK
     background = curses.COLOR_WHITE
     online_str = "Workers online: "
-    help = ("Keys: j, k: Move selection up/down. "
-            "r: revoke selected task. q: quit")
+    help_title = "Keys: "
+    help = ("j, k: sel up/down r: revoke sel q: quit")
+    greet = "celeryev %s" % celery.__version__
+    info_str = "Info: "
 
     def __init__(self, state):
         self.state = state
 
-    def format_row(self, uuid, task, worker, state):
-        uuid = uuid.ljust(36)
+    def format_row(self, uuid, worker, task, time, state):
+        my, mx = self.win.getmaxyx()
+        mx = mx - 3
+        uuid_max = 36
+        if mx < 88:
+            uuid_max = mx - 52 - 2
+        uuid = abbr(uuid, uuid_max).ljust(uuid_max)
         worker = abbr(worker, 16).ljust(16)
-        task = abbrtask(task, 20).ljust(30)
+        task = abbrtask(task, 16).ljust(16)
         state = abbr(state, 8).ljust(8)
-        row = " %s %s %s %s " % (uuid, worker, task, state)
+        time = time.ljust(8)
+        row = "%s %s %s %s %s " % (uuid, worker, task, time, state)
         if self.screen_width is None:
-            self.screen_width = len(row)
-        return row
+            self.screen_width = len(row[:mx])
+        return row[:mx]
 
     def find_position(self):
         if not self.tasks:
@@ -144,29 +155,47 @@ class CursesMonitor(object):
         control.revoke(self.selected_task)
 
     def draw(self):
-        self.handle_keypress()
         win = self.win
+        self.handle_keypress()
         x = 3
         y = blank_line = count(2).next
         my, mx = win.getmaxyx()
         win.erase()
         win.bkgd(" ", curses.color_pair(1))
         win.border()
-        win.addstr(y(), x, self.format_row("UUID", "TASK", "WORKER", "STATE"),
-                curses.A_STANDOUT)
-        for uuid, task in self.tasks:
-            if task.uuid:
-                attr = curses.A_NORMAL
-                if task.uuid == self.selected_task:
-                    attr = curses.A_STANDOUT
-                win.addstr(y(), x, self.format_row(uuid,
-                    task.name, task.worker.hostname, task.state),
-                    attr)
-                if task.ready:
-                    task.visited = time.time()
-
+        win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
+        blank_line()
+        win.addstr(y(), x, self.format_row("UUID", "TASK",
+                                           "WORKER", "TIME", "STATE"),
+                curses.A_BOLD | curses.A_UNDERLINE)
+        tasks = self.tasks
+        if tasks:
+            for uuid, task in tasks:
+                if task.uuid:
+                    state_color = self.state_colors.get(task.state)
+                    attr = curses.A_NORMAL
+                    if task.uuid == self.selected_task:
+                        attr = curses.A_STANDOUT
+                    timestamp = datetime.fromtimestamp(task.timestamp)
+                    timef = timestamp.strftime("%H:%M:%S")
+                    line = self.format_row(uuid, task.name,
+                                           task.worker.hostname,
+                                           timef, task.state)
+                    lineno = y()
+                    win.addstr(lineno, x, line, attr)
+                    if state_color:
+                        win.addstr(lineno, len(line) - len(task.state) + 1,
+                                task.state, state_color | attr)
+                    if task.ready:
+                        task.visited = time.time()
+
+        # -- Footer
+        blank_line()
+        win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width)
+
+        # Selected Task Info
         if self.selected_task:
-            win.addstr(my - 4, x, self.selected_str, curses.A_BOLD)
+            win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
             info = "Missing extended info"
             try:
                 selection = self.state.tasks[self.selected_task]
@@ -178,26 +207,55 @@ class CursesMonitor(object):
                     info["runtime"] = "%.2fs" % info["runtime"]
                 info = " ".join("%s=%s" % (key, value)
                             for key, value in info.items())
-            win.addstr(my - 4, x + len(self.selected_str), info)
+            win.addstr(my - 5, x + len(self.selected_str), info)
         else:
-            win.addstr(my - 4, x, "No task selected", curses.A_NORMAL)
+            win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
 
+
+        # Workers
         if self.workers:
-            win.addstr(my - 3, x, self.online_str, curses.A_BOLD)
-            win.addstr(my - 3, x + len(self.online_str),
+            win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
+            win.addstr(my - 4, x + len(self.online_str),
                     ", ".join(self.workers), curses.A_NORMAL)
         else:
-            win.addstr(my - 3, x, "No workers discovered.")
-        win.addstr(my - 2, x, self.help)
+            win.addstr(my - 4, x, "No workers discovered.")
+
+        # Info
+        win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
+        win.addstr(my - 3, x + len(self.info_str),
+                "events:%s tasks:%s workers:%s/%s" % (
+                    self.state.event_count, self.state.task_count,
+                    len([w for w in self.state.workers.values()
+                            if w.alive]),
+                    len(self.state.workers)),
+                curses.A_DIM)
+
+        # Help
+        win.addstr(my - 2, x, self.help_title, curses.A_BOLD)
+        win.addstr(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
         win.refresh()
 
-    def setupscreen(self):
+    def init_screen(self):
         self.win = curses.initscr()
+        self.win.nodelay(True)
+        self.win.keypad(True)
         curses.start_color()
         curses.init_pair(1, self.foreground, self.background)
+        # exception states
+        curses.init_pair(2, curses.COLOR_RED, self.background)
+        # successful state
+        curses.init_pair(3, curses.COLOR_GREEN, self.background)
+        # revoked state
+        curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
+        # greeting
+        curses.init_pair(5, curses.COLOR_BLUE, self.background)
+
+        self.state_colors = {states.SUCCESS: curses.color_pair(3),
+                             states.REVOKED: curses.color_pair(4)}
+        for state in states.EXCEPTION_STATES:
+            self.state_colors[state] = curses.color_pair(2)
+
         curses.cbreak()
-        self.win.nodelay(True)
-        self.win.keypad(True)
 
     def resetscreen(self):
         curses.nocbreak()
@@ -231,7 +289,7 @@ def eventtop():
     sys.stderr.write("-> celeryev: starting capture...\n")
     state = State()
     display = CursesMonitor(state)
-    display.setupscreen()
+    display.init_screen()
     refresher = DisplayThread(display)
     refresher.start()
     conn = establish_connection()

+ 8 - 1
celery/events/state.py

@@ -34,7 +34,9 @@ class Worker(Thing):
 
 
 class Task(Thing):
-    _info_fields = ("args", "kwargs", "retries", "result", "eta", "runtime")
+    _info_fields = ("args", "kwargs", "retries",
+                    "result", "eta", "runtime",
+                    "exception")
     uuid = None
     name = None
     state = states.PENDING
@@ -93,6 +95,8 @@ class Task(Thing):
 
 
 class State(object):
+    event_count = 0
+    task_count = 0
 
     def __init__(self, callback=None):
         self.workers = {}
@@ -131,10 +135,13 @@ class State(object):
         worker = self.get_worker(hostname)
         task = self.get_task(uuid, worker=worker)
         handler = getattr(task, type)
+        if type == "received":
+            self.task_count += 1
         if handler:
             handler(**fields)
 
     def event(self, event):
+        self.event_count += 1
         group, _, type = partition(event.pop("type"), "-")
         self.group_handlers[group](type, event)
         if self.callback: