Explorar o código

celeryev: Added 'l' to set rate limits, refactored keypress handler and added remote control reply windows.

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
c16d43b9da
Modificáronse 1 ficheiros con 117 adicións e 34 borrados
  1. 117 34
      celery/bin/celeryev.py

+ 117 - 34
celery/bin/celeryev.py

@@ -6,6 +6,7 @@ import socket
 import optparse
 import threading
 
+from pprint import pformat
 from datetime import datetime
 from textwrap import wrap
 from itertools import count
@@ -87,6 +88,7 @@ def abbrtask(S, max):
 
 
 class CursesMonitor(object):
+    keymap = {}
     win = None
     screen_width = None
     screen_delay = 0.1
@@ -102,8 +104,17 @@ class CursesMonitor(object):
     greet = "celeryev %s" % celery.__version__
     info_str = "Info: "
 
-    def __init__(self, state):
+    def __init__(self, state, keymap=None):
+        self.keymap = keymap or self.keymap
         self.state = state
+        default_keymap = {"J": self.move_selection_down,
+                          "K": self.move_selection_up,
+                          "C": self.revoke_selection,
+                          "T": self.selection_traceback,
+                          "R": self.selection_result,
+                          "I": self.selection_info,
+                          "L": self.selection_rate_limit}
+        self.keymap = dict(default_keymap, **self.keymap)
 
     def format_row(self, uuid, worker, task, time, state):
         my, mx = self.win.getmaxyx()
@@ -129,38 +140,43 @@ class CursesMonitor(object):
                 return i
         return 0
 
-    def move_selection(self, reverse=False):
+    def move_selection_up(self):
+        self.move_selection(-1)
+
+    def move_selection_down(self):
+        self.move_selection(1)
+
+    def move_selection(self, direction=1):
         if not self.tasks:
             return
-        incr = reverse and -1 or 1
         pos = self.find_position()
         try:
-            self.selected_task = self.tasks[pos + incr][0]
+            self.selected_task = self.tasks[pos + direction][0]
         except IndexError:
-            self.selected_task = None
+            self.selected_task = self.tasks[0][0]
+
+    keyalias = {curses.KEY_DOWN: "J",
+                curses.KEY_UP: "K",
+                curses.KEY_ENTER: "I"}
 
     def handle_keypress(self):
         try:
             key = self.win.getkey().upper()
         except:
             return
-        if key in (curses.KEY_DOWN, "J"):
-            self.move_selection()
-        elif key in (curses.KEY_UP, "K"):
-            self.move_selection(reverse=True)
-        elif key in ("C", ):
-            self.revoke_selection()
-        elif key in ("T", ):
-            self.selection_traceback()
-        elif key in ("R", ):
-            self.selection_result()
-        elif key in ("I", curses.KEY_ENTER):
-            self.selection_info()
-
-    def alert(self, callback):
+        key = self.keyalias.get(key) or key
+        handler = self.keymap.get(key)
+        if handler is not None:
+            handler()
+
+    def alert(self, callback, title=None):
         self.win.erase()
         my, mx = self.win.getmaxyx()
-        callback(my, mx)
+        y = blank_line = count(2).next
+        if title:
+            self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
+            blank_line()
+        callback(my, mx, y())
         self.win.addstr(my - 1, 0, "Press any key to continue...", curses.A_BOLD)
         self.win.refresh()
         while 1:
@@ -169,15 +185,81 @@ class CursesMonitor(object):
             except:
                 pass
 
+    def selection_rate_limit(self):
+        if not self.selected_task:
+            return curses.beep()
+        task = self.state.tasks[self.selected_task]
+        if not task.name:
+            return curses.beep()
+
+        my, mx = self.win.getmaxyx()
+        r = "New rate limit: "
+        self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
+        self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
+        rlimit = self.readline(my - 2, 3 + len(r))
+
+        if rlimit:
+            reply = control.rate_limit(task.name, rlimit.strip(), reply=True)
+            self.alert_remote_control_reply(reply)
+
+    def alert_remote_control_reply(self, reply):
+
+        def callback(my, mx, xs):
+            y = count(xs).next
+            if not reply:
+                self.win.addstr(y(), 3, "No replies received in 1s deadline.",
+                        curses.A_BOLD + curses.color_pair(2))
+                return
+
+            for subreply in reply:
+                curline = y()
+
+                host, response = subreply.items()[0]
+                host = "%s: " % host
+                self.win.addstr(curline, 3, host, curses.A_BOLD)
+                attr = curses.A_NORMAL
+                text = ""
+                if "error" in response:
+                    text = response["error"]
+                    attr |= curses.color_pair(2)
+                elif "ok" in response:
+                    text = response["ok"]
+                    attr |= curses.color_pair(3)
+                self.win.addstr(curline, 3 + len(host), text, attr)
+
+        return self.alert(callback, "Remote Control Command Replies")
+
+    def readline(self, x, y):
+        buffer = str()
+        curses.echo()
+        try:
+            i = 0
+            while True:
+                ch = self.win.getch(x, y + i)
+                if ch != -1:
+                    if ch in (10, curses.KEY_ENTER): # enter
+                        break
+                    if ch in (27, ):
+                        buffer = str()
+                        break
+                    buffer += chr(ch)
+                    i += 1
+        finally:
+            curses.noecho()
+        return buffer
+
     def revoke_selection(self):
-        control.revoke(self.selected_task)
+        if not self.selected_task:
+            return curses.beep()
+        reply = control.revoke(self.selected_task, reply=True)
+        self.alert_remote_control_reply(reply)
 
     def selection_info(self):
         if not self.selected_task:
             return
 
-        def alert_callback(mx, my):
-            y = count(2).next
+        def alert_callback(mx, my, xs):
+            y = count(xs).next
             task = self.state.tasks[self.selected_task]
             info = task.info(extra=["state"])
             infoitems = [("args", info.pop("args", None)),
@@ -196,37 +278,38 @@ class CursesMonitor(object):
                         self.win.addstr(y(), 3, " " * 4 + subline,
                                 curses.A_NORMAL)
 
-        return self.alert(alert_callback)
+        return self.alert(alert_callback,
+                "Task details for %s" % self.selected_task)
 
     def selection_traceback(self):
         if not self.selected_task:
-            return
+            return curses.beep()
         task = self.state.tasks[self.selected_task]
         if task.state not in states.EXCEPTION_STATES:
-            return
+            return curses.beep()
 
-        def alert_callback(my, mx):
-            y = count(2).next
+        def alert_callback(my, mx, xs):
+            y = count(xs).next
             for line in task.traceback.split("\n"):
                 self.win.addstr(y(), 3, line)
 
-        return self.alert(alert_callback)
+        return self.alert(alert_callback,
+                "Task Exception Traceback for %s" % self.selected_task)
 
     def selection_result(self):
         if not self.selected_task:
             return
 
-        def alert_callback(my, mx):
-            y = count(2).next
+        def alert_callback(my, mx, xs):
+            y = count(xs).next
             task = self.state.tasks[self.selected_task]
             result = getattr(task, "result", None) or getattr(task,
                     "exception", None)
             for line in wrap(result, mx - 2):
                 self.win.addstr(y(), 3, line)
 
-        return self.alert(alert_callback)
-
-
+        return self.alert(alert_callback,
+                "Task Result for %s" % self.selected_task)
 
     def draw(self):
         win = self.win