cursesmon.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. import celery
  2. import curses
  3. import sys
  4. import threading
  5. import time
  6. from datetime import datetime
  7. from itertools import count
  8. from textwrap import wrap
  9. from math import ceil
  10. from celery import states
  11. from celery.app import app_or_default
  12. from celery.utils import abbr, abbrtask
  13. BORDER_SPACING = 4
  14. LEFT_BORDER_OFFSET = 3
  15. UUID_WIDTH = 36
  16. STATE_WIDTH = 8
  17. TIMESTAMP_WIDTH = 8
  18. MIN_WORKER_WIDTH = 15
  19. MIN_TASK_WIDTH = 16
  20. class CursesMonitor(object):
  21. keymap = {}
  22. win = None
  23. screen_width = None
  24. screen_delay = 10
  25. selected_task = None
  26. selected_position = 0
  27. selected_str = "Selected: "
  28. foreground = curses.COLOR_BLACK
  29. background = curses.COLOR_WHITE
  30. online_str = "Workers online: "
  31. help_title = "Keys: "
  32. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  33. greet = "celeryev %s" % celery.__version__
  34. info_str = "Info: "
  35. def __init__(self, state, keymap=None, app=None):
  36. self.app = app_or_default(app)
  37. self.keymap = keymap or self.keymap
  38. self.state = state
  39. default_keymap = {"J": self.move_selection_down,
  40. "K": self.move_selection_up,
  41. "C": self.revoke_selection,
  42. "T": self.selection_traceback,
  43. "R": self.selection_result,
  44. "I": self.selection_info,
  45. "L": self.selection_rate_limit}
  46. self.keymap = dict(default_keymap, **self.keymap)
  47. def format_row(self, uuid, task, worker, timestamp, state):
  48. mx = self.display_width
  49. # include spacing
  50. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  51. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  52. if uuid_space < UUID_WIDTH:
  53. uuid_width = uuid_space
  54. else:
  55. uuid_width = UUID_WIDTH
  56. detail_width = detail_width - uuid_width - 1
  57. task_width = int(ceil(detail_width / 2.0))
  58. worker_width = detail_width - task_width - 1
  59. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  60. worker = abbr(worker, worker_width).ljust(worker_width)
  61. task = abbrtask(task, task_width).ljust(task_width)
  62. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  63. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  64. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  65. if self.screen_width is None:
  66. self.screen_width = len(row[:mx])
  67. return row[:mx]
  68. @property
  69. def screen_width(self):
  70. _, mx = self.win.getmaxyx()
  71. return mx
  72. @property
  73. def screen_height(self):
  74. my, _ = self.win.getmaxyx()
  75. return my
  76. @property
  77. def display_width(self):
  78. _, mx = self.win.getmaxyx()
  79. return mx - BORDER_SPACING
  80. @property
  81. def display_height(self):
  82. my, _ = self.win.getmaxyx()
  83. return my - 10
  84. @property
  85. def limit(self):
  86. return self.display_height
  87. def find_position(self):
  88. if not self.tasks:
  89. return 0
  90. for i, e in enumerate(self.tasks):
  91. if self.selected_task == e[0]:
  92. return i
  93. return 0
  94. def move_selection_up(self):
  95. self.move_selection(-1)
  96. def move_selection_down(self):
  97. self.move_selection(1)
  98. def move_selection(self, direction=1):
  99. if not self.tasks:
  100. return
  101. pos = self.find_position()
  102. try:
  103. self.selected_task = self.tasks[pos + direction][0]
  104. except IndexError:
  105. self.selected_task = self.tasks[0][0]
  106. keyalias = {curses.KEY_DOWN: "J",
  107. curses.KEY_UP: "K",
  108. curses.KEY_ENTER: "I"}
  109. def handle_keypress(self):
  110. try:
  111. key = self.win.getkey().upper()
  112. except:
  113. return
  114. key = self.keyalias.get(key) or key
  115. handler = self.keymap.get(key)
  116. if handler is not None:
  117. handler()
  118. def alert(self, callback, title=None):
  119. self.win.erase()
  120. my, mx = self.win.getmaxyx()
  121. y = blank_line = count(2).next
  122. if title:
  123. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  124. blank_line()
  125. callback(my, mx, y())
  126. self.win.addstr(my - 1, 0, "Press any key to continue...",
  127. curses.A_BOLD)
  128. self.win.refresh()
  129. while 1:
  130. try:
  131. return self.win.getkey().upper()
  132. except:
  133. pass
  134. def selection_rate_limit(self):
  135. if not self.selected_task:
  136. return curses.beep()
  137. task = self.state.tasks[self.selected_task]
  138. if not task.name:
  139. return curses.beep()
  140. my, mx = self.win.getmaxyx()
  141. r = "New rate limit: "
  142. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  143. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  144. rlimit = self.readline(my - 2, 3 + len(r))
  145. if rlimit:
  146. reply = self.app.control.rate_limit(task.name,
  147. rlimit.strip(), reply=True)
  148. self.alert_remote_control_reply(reply)
  149. def alert_remote_control_reply(self, reply):
  150. def callback(my, mx, xs):
  151. y = count(xs).next
  152. if not reply:
  153. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  154. curses.A_BOLD + curses.color_pair(2))
  155. return
  156. for subreply in reply:
  157. curline = y()
  158. host, response = subreply.items()[0]
  159. host = "%s: " % host
  160. self.win.addstr(curline, 3, host, curses.A_BOLD)
  161. attr = curses.A_NORMAL
  162. text = ""
  163. if "error" in response:
  164. text = response["error"]
  165. attr |= curses.color_pair(2)
  166. elif "ok" in response:
  167. text = response["ok"]
  168. attr |= curses.color_pair(3)
  169. self.win.addstr(curline, 3 + len(host), text, attr)
  170. return self.alert(callback, "Remote Control Command Replies")
  171. def readline(self, x, y):
  172. buffer = str()
  173. curses.echo()
  174. try:
  175. i = 0
  176. while True:
  177. ch = self.win.getch(x, y + i)
  178. if ch != -1:
  179. if ch in (10, curses.KEY_ENTER): # enter
  180. break
  181. if ch in (27, ):
  182. buffer = str()
  183. break
  184. buffer += chr(ch)
  185. i += 1
  186. finally:
  187. curses.noecho()
  188. return buffer
  189. def revoke_selection(self):
  190. if not self.selected_task:
  191. return curses.beep()
  192. reply = self.app.control.revoke(self.selected_task, reply=True)
  193. self.alert_remote_control_reply(reply)
  194. def selection_info(self):
  195. if not self.selected_task:
  196. return
  197. def alert_callback(mx, my, xs):
  198. my, mx = self.win.getmaxyx()
  199. y = count(xs).next
  200. task = self.state.tasks[self.selected_task]
  201. info = task.info(extra=["state"])
  202. infoitems = [("args", info.pop("args", None)),
  203. ("kwargs", info.pop("kwargs", None))] + info.items()
  204. for key, value in infoitems:
  205. if key is None:
  206. continue
  207. value = str(value)
  208. curline = y()
  209. keys = key + ": "
  210. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  211. wrapped = wrap(value, mx - 2)
  212. if len(wrapped) == 1:
  213. self.win.addstr(curline, len(keys) + 3,
  214. abbr(wrapped[0],
  215. self.screen_width - (len(keys) + 3)))
  216. else:
  217. for subline in wrapped:
  218. nexty = y()
  219. if nexty >= my - 1:
  220. subline = " " * 4 + "[...]"
  221. elif nexty >= my:
  222. break
  223. self.win.addstr(nexty, 3,
  224. abbr(" " * 4 + subline, self.screen_width - 4),
  225. curses.A_NORMAL)
  226. return self.alert(alert_callback,
  227. "Task details for %s" % self.selected_task)
  228. def selection_traceback(self):
  229. if not self.selected_task:
  230. return curses.beep()
  231. task = self.state.tasks[self.selected_task]
  232. if task.state not in states.EXCEPTION_STATES:
  233. return curses.beep()
  234. def alert_callback(my, mx, xs):
  235. y = count(xs).next
  236. for line in task.traceback.split("\n"):
  237. self.win.addstr(y(), 3, line)
  238. return self.alert(alert_callback,
  239. "Task Exception Traceback for %s" % self.selected_task)
  240. def selection_result(self):
  241. if not self.selected_task:
  242. return
  243. def alert_callback(my, mx, xs):
  244. y = count(xs).next
  245. task = self.state.tasks[self.selected_task]
  246. result = getattr(task, "result", None) or getattr(task,
  247. "exception", None)
  248. for line in wrap(result, mx - 2):
  249. self.win.addstr(y(), 3, line)
  250. return self.alert(alert_callback,
  251. "Task Result for %s" % self.selected_task)
  252. def display_task_row(self, lineno, task):
  253. state_color = self.state_colors.get(task.state)
  254. attr = curses.A_NORMAL
  255. if task.uuid == self.selected_task:
  256. attr = curses.A_STANDOUT
  257. timestamp = datetime.fromtimestamp(
  258. task.timestamp or time.time())
  259. timef = timestamp.strftime("%H:%M:%S")
  260. line = self.format_row(task.uuid, task.name,
  261. task.worker.hostname,
  262. timef, task.state)
  263. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  264. if state_color:
  265. self.win.addstr(lineno,
  266. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  267. task.state, state_color | attr)
  268. def draw(self):
  269. win = self.win
  270. self.handle_keypress()
  271. x = LEFT_BORDER_OFFSET
  272. y = blank_line = count(2).next
  273. my, mx = win.getmaxyx()
  274. win.erase()
  275. win.bkgd(" ", curses.color_pair(1))
  276. win.border()
  277. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  278. blank_line()
  279. win.addstr(y(), x, self.format_row("UUID", "TASK",
  280. "WORKER", "TIME", "STATE"),
  281. curses.A_BOLD | curses.A_UNDERLINE)
  282. tasks = self.tasks
  283. if tasks:
  284. for row, (uuid, task) in enumerate(tasks):
  285. if row > self.display_height:
  286. break
  287. if task.uuid:
  288. lineno = y()
  289. self.display_task_row(lineno, task)
  290. # -- Footer
  291. blank_line()
  292. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  293. # Selected Task Info
  294. if self.selected_task:
  295. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  296. info = "Missing extended info"
  297. detail = ""
  298. try:
  299. selection = self.state.tasks[self.selected_task]
  300. except KeyError:
  301. pass
  302. else:
  303. info = selection.info(["args", "kwargs",
  304. "result", "runtime", "eta"])
  305. if "runtime" in info:
  306. info["runtime"] = "%.2fs" % info["runtime"]
  307. if "result" in info:
  308. info["result"] = abbr(info["result"], 16)
  309. info = " ".join("%s=%s" % (key, value)
  310. for key, value in info.items())
  311. detail = "... -> key i"
  312. infowin = abbr(info,
  313. self.screen_width - len(self.selected_str) - 2,
  314. detail)
  315. win.addstr(my - 5, x + len(self.selected_str), infowin)
  316. # Make ellipsis bold
  317. if detail in infowin:
  318. detailpos = len(infowin) - len(detail)
  319. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  320. detail, curses.A_BOLD)
  321. else:
  322. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  323. # Workers
  324. if self.workers:
  325. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  326. win.addstr(my - 4, x + len(self.online_str),
  327. ", ".join(sorted(self.workers)), curses.A_NORMAL)
  328. else:
  329. win.addstr(my - 4, x, "No workers discovered.")
  330. # Info
  331. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  332. win.addstr(my - 3, x + len(self.info_str),
  333. "events:%s tasks:%s workers:%s/%s" % (
  334. self.state.event_count, self.state.task_count,
  335. len([w for w in self.state.workers.values()
  336. if w.alive]),
  337. len(self.state.workers)),
  338. curses.A_DIM)
  339. # Help
  340. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  341. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  342. curses.A_DIM)
  343. win.refresh()
  344. def safe_add_str(self, y, x, string, *args, **kwargs):
  345. if x + len(string) > self.screen_width:
  346. string = string[:self.screen_width - x]
  347. self.win.addstr(y, x, string, *args, **kwargs)
  348. def init_screen(self):
  349. self.win = curses.initscr()
  350. self.win.nodelay(True)
  351. self.win.keypad(True)
  352. curses.start_color()
  353. curses.init_pair(1, self.foreground, self.background)
  354. # exception states
  355. curses.init_pair(2, curses.COLOR_RED, self.background)
  356. # successful state
  357. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  358. # revoked state
  359. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  360. # greeting
  361. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  362. # started state
  363. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  364. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  365. states.REVOKED: curses.color_pair(4),
  366. states.STARTED: curses.color_pair(6)}
  367. for state in states.EXCEPTION_STATES:
  368. self.state_colors[state] = curses.color_pair(2)
  369. curses.cbreak()
  370. def resetscreen(self):
  371. curses.nocbreak()
  372. self.win.keypad(False)
  373. curses.echo()
  374. curses.endwin()
  375. def nap(self):
  376. curses.napms(self.screen_delay)
  377. @property
  378. def tasks(self):
  379. return self.state.tasks_by_timestamp()[:self.limit]
  380. @property
  381. def workers(self):
  382. return [hostname
  383. for hostname, w in self.state.workers.items()
  384. if w.alive]
  385. class DisplayThread(threading.Thread):
  386. def __init__(self, display):
  387. self.display = display
  388. self.shutdown = False
  389. threading.Thread.__init__(self)
  390. def run(self):
  391. while not self.shutdown:
  392. self.display.draw()
  393. self.display.nap()
  394. def evtop(app=None):
  395. sys.stderr.write("-> evtop: starting capture...\n")
  396. app = app_or_default(app)
  397. state = app.events.State()
  398. conn = app.broker_connection()
  399. recv = app.events.Receiver(conn, handlers={"*": state.event})
  400. capture = recv.itercapture()
  401. capture.next()
  402. display = CursesMonitor(state, app=app)
  403. display.init_screen()
  404. refresher = DisplayThread(display)
  405. refresher.start()
  406. try:
  407. capture.next()
  408. except Exception:
  409. refresher.shutdown = True
  410. refresher.join()
  411. display.resetscreen()
  412. raise
  413. except (KeyboardInterrupt, SystemExit):
  414. conn and conn.close()
  415. refresher.shutdown = True
  416. refresher.join()
  417. display.resetscreen()
  418. if __name__ == "__main__":
  419. evtop()