cursesmon.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. import celery
  2. import curses
  3. import sys
  4. import threading
  5. import time
  6. from datetime import datetime
  7. from itertools import count
  8. from textwrap import wrap
  9. from carrot.utils import rpartition
  10. from celery import states
  11. from celery.events import EventReceiver
  12. from celery.events.state import State
  13. from celery.messaging import establish_connection
  14. from celery.task import control
  15. from celery.utils import abbr, abbrtask
  16. class CursesMonitor(object):
  17. keymap = {}
  18. win = None
  19. screen_width = None
  20. screen_delay = 0.1
  21. selected_task = None
  22. selected_position = 0
  23. selected_str = "Selected: "
  24. limit = 20
  25. foreground = curses.COLOR_BLACK
  26. background = curses.COLOR_WHITE
  27. online_str = "Workers online: "
  28. help_title = "Keys: "
  29. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  30. greet = "celeryev %s" % celery.__version__
  31. info_str = "Info: "
  32. def __init__(self, state, keymap=None):
  33. self.keymap = keymap or self.keymap
  34. self.state = state
  35. default_keymap = {"J": self.move_selection_down,
  36. "K": self.move_selection_up,
  37. "C": self.revoke_selection,
  38. "T": self.selection_traceback,
  39. "R": self.selection_result,
  40. "I": self.selection_info,
  41. "L": self.selection_rate_limit}
  42. self.keymap = dict(default_keymap, **self.keymap)
  43. def format_row(self, uuid, worker, task, timestamp, state):
  44. my, mx = self.win.getmaxyx()
  45. mx = mx - 3
  46. uuid_max = 36
  47. if mx < 88:
  48. uuid_max = mx - 52 - 2
  49. uuid = abbr(uuid, uuid_max).ljust(uuid_max)
  50. worker = abbr(worker, 16).ljust(16)
  51. task = abbrtask(task, 16).ljust(16)
  52. state = abbr(state, 8).ljust(8)
  53. timestamp = timestamp.ljust(8)
  54. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  55. if self.screen_width is None:
  56. self.screen_width = len(row[:mx])
  57. return row[:mx]
  58. def find_position(self):
  59. if not self.tasks:
  60. return 0
  61. for i, e in enumerate(self.tasks):
  62. if self.selected_task == e[0]:
  63. return i
  64. return 0
  65. def move_selection_up(self):
  66. self.move_selection(-1)
  67. def move_selection_down(self):
  68. self.move_selection(1)
  69. def move_selection(self, direction=1):
  70. if not self.tasks:
  71. return
  72. pos = self.find_position()
  73. try:
  74. self.selected_task = self.tasks[pos + direction][0]
  75. except IndexError:
  76. self.selected_task = self.tasks[0][0]
  77. keyalias = {curses.KEY_DOWN: "J",
  78. curses.KEY_UP: "K",
  79. curses.KEY_ENTER: "I"}
  80. def handle_keypress(self):
  81. try:
  82. key = self.win.getkey().upper()
  83. except:
  84. return
  85. key = self.keyalias.get(key) or key
  86. handler = self.keymap.get(key)
  87. if handler is not None:
  88. handler()
  89. def alert(self, callback, title=None):
  90. self.win.erase()
  91. my, mx = self.win.getmaxyx()
  92. y = blank_line = count(2).next
  93. if title:
  94. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  95. blank_line()
  96. callback(my, mx, y())
  97. self.win.addstr(my - 1, 0, "Press any key to continue...",
  98. curses.A_BOLD)
  99. self.win.refresh()
  100. while 1:
  101. try:
  102. return self.win.getkey().upper()
  103. except:
  104. pass
  105. def selection_rate_limit(self):
  106. if not self.selected_task:
  107. return curses.beep()
  108. task = self.state.tasks[self.selected_task]
  109. if not task.name:
  110. return curses.beep()
  111. my, mx = self.win.getmaxyx()
  112. r = "New rate limit: "
  113. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  114. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  115. rlimit = self.readline(my - 2, 3 + len(r))
  116. if rlimit:
  117. reply = control.rate_limit(task.name, rlimit.strip(), reply=True)
  118. self.alert_remote_control_reply(reply)
  119. def alert_remote_control_reply(self, reply):
  120. def callback(my, mx, xs):
  121. y = count(xs).next
  122. if not reply:
  123. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  124. curses.A_BOLD + curses.color_pair(2))
  125. return
  126. for subreply in reply:
  127. curline = y()
  128. host, response = subreply.items()[0]
  129. host = "%s: " % host
  130. self.win.addstr(curline, 3, host, curses.A_BOLD)
  131. attr = curses.A_NORMAL
  132. text = ""
  133. if "error" in response:
  134. text = response["error"]
  135. attr |= curses.color_pair(2)
  136. elif "ok" in response:
  137. text = response["ok"]
  138. attr |= curses.color_pair(3)
  139. self.win.addstr(curline, 3 + len(host), text, attr)
  140. return self.alert(callback, "Remote Control Command Replies")
  141. def readline(self, x, y):
  142. buffer = str()
  143. curses.echo()
  144. try:
  145. i = 0
  146. while True:
  147. ch = self.win.getch(x, y + i)
  148. if ch != -1:
  149. if ch in (10, curses.KEY_ENTER): # enter
  150. break
  151. if ch in (27, ):
  152. buffer = str()
  153. break
  154. buffer += chr(ch)
  155. i += 1
  156. finally:
  157. curses.noecho()
  158. return buffer
  159. def revoke_selection(self):
  160. if not self.selected_task:
  161. return curses.beep()
  162. reply = control.revoke(self.selected_task, reply=True)
  163. self.alert_remote_control_reply(reply)
  164. def selection_info(self):
  165. if not self.selected_task:
  166. return
  167. def alert_callback(mx, my, xs):
  168. y = count(xs).next
  169. task = self.state.tasks[self.selected_task]
  170. info = task.info(extra=["state"])
  171. infoitems = [("args", info.pop("args", None)),
  172. ("kwargs", info.pop("kwargs", None))] + info.items()
  173. for key, value in infoitems:
  174. if key is None:
  175. continue
  176. curline = y()
  177. keys = key + ": "
  178. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  179. wrapped = wrap(str(value), mx - 2)
  180. if len(wrapped) == 1:
  181. self.win.addstr(curline, len(keys) + 3, wrapped[0])
  182. else:
  183. for subline in wrapped:
  184. self.win.addstr(y(), 3, " " * 4 + subline,
  185. curses.A_NORMAL)
  186. return self.alert(alert_callback,
  187. "Task details for %s" % self.selected_task)
  188. def selection_traceback(self):
  189. if not self.selected_task:
  190. return curses.beep()
  191. task = self.state.tasks[self.selected_task]
  192. if task.state not in states.EXCEPTION_STATES:
  193. return curses.beep()
  194. def alert_callback(my, mx, xs):
  195. y = count(xs).next
  196. for line in task.traceback.split("\n"):
  197. self.win.addstr(y(), 3, line)
  198. return self.alert(alert_callback,
  199. "Task Exception Traceback for %s" % self.selected_task)
  200. def selection_result(self):
  201. if not self.selected_task:
  202. return
  203. def alert_callback(my, mx, xs):
  204. y = count(xs).next
  205. task = self.state.tasks[self.selected_task]
  206. result = getattr(task, "result", None) or getattr(task,
  207. "exception", None)
  208. for line in wrap(result, mx - 2):
  209. self.win.addstr(y(), 3, line)
  210. return self.alert(alert_callback,
  211. "Task Result for %s" % self.selected_task)
  212. def draw(self):
  213. win = self.win
  214. self.handle_keypress()
  215. x = 3
  216. y = blank_line = count(2).next
  217. my, mx = win.getmaxyx()
  218. win.erase()
  219. win.bkgd(" ", curses.color_pair(1))
  220. win.border()
  221. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  222. blank_line()
  223. win.addstr(y(), x, self.format_row("UUID", "TASK",
  224. "WORKER", "TIME", "STATE"),
  225. curses.A_BOLD | curses.A_UNDERLINE)
  226. tasks = self.tasks
  227. if tasks:
  228. for uuid, task in tasks:
  229. if task.uuid:
  230. state_color = self.state_colors.get(task.state)
  231. attr = curses.A_NORMAL
  232. if task.uuid == self.selected_task:
  233. attr = curses.A_STANDOUT
  234. timestamp = datetime.fromtimestamp(
  235. task.timestamp or time.time())
  236. timef = timestamp.strftime("%H:%M:%S")
  237. line = self.format_row(uuid, task.name,
  238. task.worker.hostname,
  239. timef, task.state)
  240. lineno = y()
  241. win.addstr(lineno, x, line, attr)
  242. if state_color:
  243. win.addstr(lineno, len(line) - len(task.state) + 1,
  244. task.state, state_color | attr)
  245. if task.ready:
  246. task.visited = time.time()
  247. # -- Footer
  248. blank_line()
  249. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width)
  250. # Selected Task Info
  251. if self.selected_task:
  252. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  253. info = "Missing extended info"
  254. try:
  255. selection = self.state.tasks[self.selected_task]
  256. except KeyError:
  257. pass
  258. else:
  259. info = selection.info(["args", "kwargs",
  260. "result", "runtime", "eta"])
  261. if "runtime" in info:
  262. info["runtime"] = "%.2fs" % info["runtime"]
  263. if "result" in info:
  264. info["result"] = abbr(info["result"], 16)
  265. info = " ".join("%s=%s" % (key, value)
  266. for key, value in info.items())
  267. win.addstr(my - 5, x + len(self.selected_str), info)
  268. else:
  269. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  270. # Workers
  271. if self.workers:
  272. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  273. win.addstr(my - 4, x + len(self.online_str),
  274. ", ".join(self.workers), curses.A_NORMAL)
  275. else:
  276. win.addstr(my - 4, x, "No workers discovered.")
  277. # Info
  278. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  279. win.addstr(my - 3, x + len(self.info_str),
  280. "events:%s tasks:%s workers:%s/%s" % (
  281. self.state.event_count, self.state.task_count,
  282. len([w for w in self.state.workers.values()
  283. if w.alive]),
  284. len(self.state.workers)),
  285. curses.A_DIM)
  286. # Help
  287. win.addstr(my - 2, x, self.help_title, curses.A_BOLD)
  288. win.addstr(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
  289. win.refresh()
  290. def init_screen(self):
  291. self.win = curses.initscr()
  292. self.win.nodelay(True)
  293. self.win.keypad(True)
  294. curses.start_color()
  295. curses.init_pair(1, self.foreground, self.background)
  296. # exception states
  297. curses.init_pair(2, curses.COLOR_RED, self.background)
  298. # successful state
  299. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  300. # revoked state
  301. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  302. # greeting
  303. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  304. # started state
  305. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  306. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  307. states.REVOKED: curses.color_pair(4),
  308. states.STARTED: curses.color_pair(6)}
  309. for state in states.EXCEPTION_STATES:
  310. self.state_colors[state] = curses.color_pair(2)
  311. curses.cbreak()
  312. def resetscreen(self):
  313. curses.nocbreak()
  314. self.win.keypad(False)
  315. curses.echo()
  316. curses.endwin()
  317. def nap(self):
  318. curses.napms(int(self.screen_delay * 1000))
  319. @property
  320. def tasks(self):
  321. return self.state.tasks_by_timestamp()[:self.limit]
  322. @property
  323. def workers(self):
  324. return [hostname
  325. for hostname, w in self.state.workers.items()
  326. if w.alive]
  327. class DisplayThread(threading.Thread):
  328. def __init__(self, display):
  329. self.display = display
  330. self.shutdown = False
  331. threading.Thread.__init__(self)
  332. def run(self):
  333. while not self.shutdown:
  334. self.display.draw()
  335. self.display.nap()
  336. def evtop():
  337. sys.stderr.write("-> evtop: starting capture...\n")
  338. state = State()
  339. display = CursesMonitor(state)
  340. display.init_screen()
  341. refresher = DisplayThread(display)
  342. refresher.start()
  343. conn = establish_connection()
  344. recv = EventReceiver(conn, handlers={"*": state.event})
  345. try:
  346. recv.capture(limit=None)
  347. except Exception:
  348. refresher.shutdown = True
  349. refresher.join()
  350. display.resetscreen()
  351. raise
  352. except (KeyboardInterrupt, SystemExit):
  353. conn and conn.close()
  354. refresher.shutdown = True
  355. refresher.join()
  356. display.resetscreen()
  357. if __name__ == "__main__":
  358. evtop()