cursesmon.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. import celery
  2. import curses
  3. import sys
  4. import threading
  5. import time
  6. from datetime import datetime
  7. from itertools import count
  8. from textwrap import wrap
  9. from celery import states
  10. from celery.events import EventReceiver
  11. from celery.events.state import State
  12. from celery.messaging import establish_connection
  13. from celery.task import control
  14. from celery.utils import abbr, abbrtask
  15. class CursesMonitor(object):
  16. keymap = {}
  17. win = None
  18. screen_width = None
  19. screen_delay = 10
  20. selected_task = None
  21. selected_position = 0
  22. selected_str = "Selected: "
  23. limit = 20
  24. foreground = curses.COLOR_BLACK
  25. background = curses.COLOR_WHITE
  26. online_str = "Workers online: "
  27. help_title = "Keys: "
  28. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  29. greet = "celeryev %s" % celery.__version__
  30. info_str = "Info: "
  31. def __init__(self, state, keymap=None):
  32. self.keymap = keymap or self.keymap
  33. self.state = state
  34. default_keymap = {"J": self.move_selection_down,
  35. "K": self.move_selection_up,
  36. "C": self.revoke_selection,
  37. "T": self.selection_traceback,
  38. "R": self.selection_result,
  39. "I": self.selection_info,
  40. "L": self.selection_rate_limit}
  41. self.keymap = dict(default_keymap, **self.keymap)
  42. def format_row(self, uuid, worker, task, timestamp, state):
  43. my, mx = self.win.getmaxyx()
  44. mx = mx - 3
  45. uuid_max = 36
  46. if mx < 88:
  47. uuid_max = mx - 52 - 2
  48. uuid = abbr(uuid, uuid_max).ljust(uuid_max)
  49. worker = abbr(worker, 16).ljust(16)
  50. task = abbrtask(task, 16).ljust(16)
  51. state = abbr(state, 8).ljust(8)
  52. timestamp = timestamp.ljust(8)
  53. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  54. if self.screen_width is None:
  55. self.screen_width = len(row[:mx])
  56. return row[:mx]
  57. def find_position(self):
  58. if not self.tasks:
  59. return 0
  60. for i, e in enumerate(self.tasks):
  61. if self.selected_task == e[0]:
  62. return i
  63. return 0
  64. def move_selection_up(self):
  65. self.move_selection(-1)
  66. def move_selection_down(self):
  67. self.move_selection(1)
  68. def move_selection(self, direction=1):
  69. if not self.tasks:
  70. return
  71. pos = self.find_position()
  72. try:
  73. self.selected_task = self.tasks[pos + direction][0]
  74. except IndexError:
  75. self.selected_task = self.tasks[0][0]
  76. keyalias = {curses.KEY_DOWN: "J",
  77. curses.KEY_UP: "K",
  78. curses.KEY_ENTER: "I"}
  79. def handle_keypress(self):
  80. try:
  81. key = self.win.getkey().upper()
  82. except:
  83. return
  84. key = self.keyalias.get(key) or key
  85. handler = self.keymap.get(key)
  86. if handler is not None:
  87. handler()
  88. def alert(self, callback, title=None):
  89. self.win.erase()
  90. my, mx = self.win.getmaxyx()
  91. y = blank_line = count(2).next
  92. if title:
  93. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  94. blank_line()
  95. callback(my, mx, y())
  96. self.win.addstr(my - 1, 0, "Press any key to continue...",
  97. curses.A_BOLD)
  98. self.win.refresh()
  99. while 1:
  100. try:
  101. return self.win.getkey().upper()
  102. except:
  103. pass
  104. def selection_rate_limit(self):
  105. if not self.selected_task:
  106. return curses.beep()
  107. task = self.state.tasks[self.selected_task]
  108. if not task.name:
  109. return curses.beep()
  110. my, mx = self.win.getmaxyx()
  111. r = "New rate limit: "
  112. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  113. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  114. rlimit = self.readline(my - 2, 3 + len(r))
  115. if rlimit:
  116. reply = control.rate_limit(task.name, rlimit.strip(), reply=True)
  117. self.alert_remote_control_reply(reply)
  118. def alert_remote_control_reply(self, reply):
  119. def callback(my, mx, xs):
  120. y = count(xs).next
  121. if not reply:
  122. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  123. curses.A_BOLD + curses.color_pair(2))
  124. return
  125. for subreply in reply:
  126. curline = y()
  127. host, response = subreply.items()[0]
  128. host = "%s: " % host
  129. self.win.addstr(curline, 3, host, curses.A_BOLD)
  130. attr = curses.A_NORMAL
  131. text = ""
  132. if "error" in response:
  133. text = response["error"]
  134. attr |= curses.color_pair(2)
  135. elif "ok" in response:
  136. text = response["ok"]
  137. attr |= curses.color_pair(3)
  138. self.win.addstr(curline, 3 + len(host), text, attr)
  139. return self.alert(callback, "Remote Control Command Replies")
  140. def readline(self, x, y):
  141. buffer = str()
  142. curses.echo()
  143. try:
  144. i = 0
  145. while True:
  146. ch = self.win.getch(x, y + i)
  147. if ch != -1:
  148. if ch in (10, curses.KEY_ENTER): # enter
  149. break
  150. if ch in (27, ):
  151. buffer = str()
  152. break
  153. buffer += chr(ch)
  154. i += 1
  155. finally:
  156. curses.noecho()
  157. return buffer
  158. def revoke_selection(self):
  159. if not self.selected_task:
  160. return curses.beep()
  161. reply = control.revoke(self.selected_task, reply=True)
  162. self.alert_remote_control_reply(reply)
  163. def selection_info(self):
  164. if not self.selected_task:
  165. return
  166. def alert_callback(mx, my, xs):
  167. my, mx = self.win.getmaxyx()
  168. y = count(xs).next
  169. task = self.state.tasks[self.selected_task]
  170. info = task.info(extra=["state"])
  171. infoitems = [("args", info.pop("args", None)),
  172. ("kwargs", info.pop("kwargs", None))] + info.items()
  173. for key, value in infoitems:
  174. if key is None:
  175. continue
  176. value = str(value)
  177. curline = y()
  178. keys = key + ": "
  179. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  180. wrapped = wrap(value, mx - 2)
  181. if len(wrapped) == 1:
  182. self.win.addstr(curline, len(keys) + 3,
  183. abbr(wrapped[0],
  184. self.screen_width - (len(keys) + 3)))
  185. else:
  186. for subline in wrapped:
  187. nexty = y()
  188. if nexty >= my - 1:
  189. subline = " " * 4 + "[...]"
  190. elif nexty >= my:
  191. break
  192. self.win.addstr(nexty, 3,
  193. abbr(" " * 4 + subline, self.screen_width - 4),
  194. curses.A_NORMAL)
  195. return self.alert(alert_callback,
  196. "Task details for %s" % self.selected_task)
  197. def selection_traceback(self):
  198. if not self.selected_task:
  199. return curses.beep()
  200. task = self.state.tasks[self.selected_task]
  201. if task.state not in states.EXCEPTION_STATES:
  202. return curses.beep()
  203. def alert_callback(my, mx, xs):
  204. y = count(xs).next
  205. for line in task.traceback.split("\n"):
  206. self.win.addstr(y(), 3, line)
  207. return self.alert(alert_callback,
  208. "Task Exception Traceback for %s" % self.selected_task)
  209. def selection_result(self):
  210. if not self.selected_task:
  211. return
  212. def alert_callback(my, mx, xs):
  213. y = count(xs).next
  214. task = self.state.tasks[self.selected_task]
  215. result = getattr(task, "result", None) or getattr(task,
  216. "exception", None)
  217. for line in wrap(result, mx - 2):
  218. self.win.addstr(y(), 3, line)
  219. return self.alert(alert_callback,
  220. "Task Result for %s" % self.selected_task)
  221. def draw(self):
  222. win = self.win
  223. self.handle_keypress()
  224. x = 3
  225. y = blank_line = count(2).next
  226. my, mx = win.getmaxyx()
  227. win.erase()
  228. win.bkgd(" ", curses.color_pair(1))
  229. win.border()
  230. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  231. blank_line()
  232. win.addstr(y(), x, self.format_row("UUID", "TASK",
  233. "WORKER", "TIME", "STATE"),
  234. curses.A_BOLD | curses.A_UNDERLINE)
  235. tasks = self.tasks
  236. if tasks:
  237. for uuid, task in tasks:
  238. if task.uuid:
  239. state_color = self.state_colors.get(task.state)
  240. attr = curses.A_NORMAL
  241. if task.uuid == self.selected_task:
  242. attr = curses.A_STANDOUT
  243. timestamp = datetime.fromtimestamp(
  244. task.timestamp or time.time())
  245. timef = timestamp.strftime("%H:%M:%S")
  246. line = self.format_row(uuid, task.name,
  247. task.worker.hostname,
  248. timef, task.state)
  249. lineno = y()
  250. win.addstr(lineno, x, line, attr)
  251. if state_color:
  252. win.addstr(lineno, len(line) - len(task.state) + 1,
  253. task.state, state_color | attr)
  254. if task.ready:
  255. task.visited = time.time()
  256. # -- Footer
  257. blank_line()
  258. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width)
  259. # Selected Task Info
  260. if self.selected_task:
  261. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  262. info = "Missing extended info"
  263. try:
  264. selection = self.state.tasks[self.selected_task]
  265. except KeyError:
  266. pass
  267. else:
  268. info = selection.info(["args", "kwargs",
  269. "result", "runtime", "eta"])
  270. if "runtime" in info:
  271. info["runtime"] = "%.2fs" % info["runtime"]
  272. if "result" in info:
  273. info["result"] = abbr(info["result"], 16)
  274. info = " ".join("%s=%s" % (key, value)
  275. for key, value in info.items())
  276. detail = "... -> key i"
  277. infowin = abbr(info,
  278. self.screen_width - len(self.selected_str) - 2,
  279. detail)
  280. win.addstr(my - 5, x + len(self.selected_str), infowin)
  281. # Make ellipsis bold
  282. if detail in infowin:
  283. detailpos = len(infowin) - len(detail)
  284. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  285. detail, curses.A_BOLD)
  286. else:
  287. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  288. # Workers
  289. if self.workers:
  290. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  291. win.addstr(my - 4, x + len(self.online_str),
  292. ", ".join(self.workers), curses.A_NORMAL)
  293. else:
  294. win.addstr(my - 4, x, "No workers discovered.")
  295. # Info
  296. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  297. win.addstr(my - 3, x + len(self.info_str),
  298. "events:%s tasks:%s workers:%s/%s" % (
  299. self.state.event_count, self.state.task_count,
  300. len([w for w in self.state.workers.values()
  301. if w.alive]),
  302. len(self.state.workers)),
  303. curses.A_DIM)
  304. # Help
  305. win.addstr(my - 2, x, self.help_title, curses.A_BOLD)
  306. win.addstr(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
  307. win.refresh()
  308. def init_screen(self):
  309. self.win = curses.initscr()
  310. self.win.nodelay(True)
  311. self.win.keypad(True)
  312. curses.start_color()
  313. curses.init_pair(1, self.foreground, self.background)
  314. # exception states
  315. curses.init_pair(2, curses.COLOR_RED, self.background)
  316. # successful state
  317. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  318. # revoked state
  319. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  320. # greeting
  321. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  322. # started state
  323. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  324. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  325. states.REVOKED: curses.color_pair(4),
  326. states.STARTED: curses.color_pair(6)}
  327. for state in states.EXCEPTION_STATES:
  328. self.state_colors[state] = curses.color_pair(2)
  329. curses.cbreak()
  330. def resetscreen(self):
  331. curses.nocbreak()
  332. self.win.keypad(False)
  333. curses.echo()
  334. curses.endwin()
  335. def nap(self):
  336. curses.napms(self.screen_delay)
  337. @property
  338. def tasks(self):
  339. return self.state.tasks_by_timestamp()[:self.limit]
  340. @property
  341. def workers(self):
  342. return [hostname
  343. for hostname, w in self.state.workers.items()
  344. if w.alive]
  345. class DisplayThread(threading.Thread):
  346. def __init__(self, display):
  347. self.display = display
  348. self.shutdown = False
  349. threading.Thread.__init__(self)
  350. def run(self):
  351. while not self.shutdown:
  352. self.display.draw()
  353. self.display.nap()
  354. def evtop():
  355. sys.stderr.write("-> evtop: starting capture...\n")
  356. state = State()
  357. display = CursesMonitor(state)
  358. display.init_screen()
  359. refresher = DisplayThread(display)
  360. refresher.start()
  361. conn = establish_connection()
  362. recv = EventReceiver(conn, handlers={"*": state.event})
  363. try:
  364. recv.capture(limit=None)
  365. except Exception:
  366. refresher.shutdown = True
  367. refresher.join()
  368. display.resetscreen()
  369. raise
  370. except (KeyboardInterrupt, SystemExit):
  371. conn and conn.close()
  372. refresher.shutdown = True
  373. refresher.join()
  374. display.resetscreen()
  375. if __name__ == "__main__":
  376. evtop()