cursesmon.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.cursesmon
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Graphical monitor of Celery events using curses.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. import curses
  11. import sys
  12. import threading
  13. import time
  14. from datetime import datetime
  15. from itertools import count
  16. from textwrap import wrap
  17. from math import ceil
  18. from .. import __version__
  19. from .. import states
  20. from ..app import app_or_default
  21. from ..utils import abbr, abbrtask
  22. BORDER_SPACING = 4
  23. LEFT_BORDER_OFFSET = 3
  24. UUID_WIDTH = 36
  25. STATE_WIDTH = 8
  26. TIMESTAMP_WIDTH = 8
  27. MIN_WORKER_WIDTH = 15
  28. MIN_TASK_WIDTH = 16
  29. class CursesMonitor(object):
  30. keymap = {}
  31. win = None
  32. screen_width = None
  33. screen_delay = 10
  34. selected_task = None
  35. selected_position = 0
  36. selected_str = "Selected: "
  37. foreground = curses.COLOR_BLACK
  38. background = curses.COLOR_WHITE
  39. online_str = "Workers online: "
  40. help_title = "Keys: "
  41. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  42. greet = "celeryev %s" % __version__
  43. info_str = "Info: "
  44. def __init__(self, state, keymap=None, app=None):
  45. self.app = app_or_default(app)
  46. self.keymap = keymap or self.keymap
  47. self.state = state
  48. default_keymap = {"J": self.move_selection_down,
  49. "K": self.move_selection_up,
  50. "C": self.revoke_selection,
  51. "T": self.selection_traceback,
  52. "R": self.selection_result,
  53. "I": self.selection_info,
  54. "L": self.selection_rate_limit}
  55. self.keymap = dict(default_keymap, **self.keymap)
  56. def format_row(self, uuid, task, worker, timestamp, state):
  57. mx = self.display_width
  58. # include spacing
  59. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  60. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  61. if uuid_space < UUID_WIDTH:
  62. uuid_width = uuid_space
  63. else:
  64. uuid_width = UUID_WIDTH
  65. detail_width = detail_width - uuid_width - 1
  66. task_width = int(ceil(detail_width / 2.0))
  67. worker_width = detail_width - task_width - 1
  68. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  69. worker = abbr(worker, worker_width).ljust(worker_width)
  70. task = abbrtask(task, task_width).ljust(task_width)
  71. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  72. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  73. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  74. if self.screen_width is None:
  75. self.screen_width = len(row[:mx])
  76. return row[:mx]
  77. @property
  78. def screen_width(self):
  79. _, mx = self.win.getmaxyx()
  80. return mx
  81. @property
  82. def screen_height(self):
  83. my, _ = self.win.getmaxyx()
  84. return my
  85. @property
  86. def display_width(self):
  87. _, mx = self.win.getmaxyx()
  88. return mx - BORDER_SPACING
  89. @property
  90. def display_height(self):
  91. my, _ = self.win.getmaxyx()
  92. return my - 10
  93. @property
  94. def limit(self):
  95. return self.display_height
  96. def find_position(self):
  97. if not self.tasks:
  98. return 0
  99. for i, e in enumerate(self.tasks):
  100. if self.selected_task == e[0]:
  101. return i
  102. return 0
  103. def move_selection_up(self):
  104. self.move_selection(-1)
  105. def move_selection_down(self):
  106. self.move_selection(1)
  107. def move_selection(self, direction=1):
  108. if not self.tasks:
  109. return
  110. pos = self.find_position()
  111. try:
  112. self.selected_task = self.tasks[pos + direction][0]
  113. except IndexError:
  114. self.selected_task = self.tasks[0][0]
  115. keyalias = {curses.KEY_DOWN: "J",
  116. curses.KEY_UP: "K",
  117. curses.KEY_ENTER: "I"}
  118. def handle_keypress(self):
  119. try:
  120. key = self.win.getkey().upper()
  121. except:
  122. return
  123. key = self.keyalias.get(key) or key
  124. handler = self.keymap.get(key)
  125. if handler is not None:
  126. handler()
  127. def alert(self, callback, title=None):
  128. self.win.erase()
  129. my, mx = self.win.getmaxyx()
  130. y = blank_line = count(2).next
  131. if title:
  132. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  133. blank_line()
  134. callback(my, mx, y())
  135. self.win.addstr(my - 1, 0, "Press any key to continue...",
  136. curses.A_BOLD)
  137. self.win.refresh()
  138. while 1:
  139. try:
  140. return self.win.getkey().upper()
  141. except:
  142. pass
  143. def selection_rate_limit(self):
  144. if not self.selected_task:
  145. return curses.beep()
  146. task = self.state.tasks[self.selected_task]
  147. if not task.name:
  148. return curses.beep()
  149. my, mx = self.win.getmaxyx()
  150. r = "New rate limit: "
  151. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  152. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  153. rlimit = self.readline(my - 2, 3 + len(r))
  154. if rlimit:
  155. reply = self.app.control.rate_limit(task.name,
  156. rlimit.strip(), reply=True)
  157. self.alert_remote_control_reply(reply)
  158. def alert_remote_control_reply(self, reply):
  159. def callback(my, mx, xs):
  160. y = count(xs).next
  161. if not reply:
  162. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  163. curses.A_BOLD + curses.color_pair(2))
  164. return
  165. for subreply in reply:
  166. curline = y()
  167. host, response = subreply.items()[0]
  168. host = "%s: " % host
  169. self.win.addstr(curline, 3, host, curses.A_BOLD)
  170. attr = curses.A_NORMAL
  171. text = ""
  172. if "error" in response:
  173. text = response["error"]
  174. attr |= curses.color_pair(2)
  175. elif "ok" in response:
  176. text = response["ok"]
  177. attr |= curses.color_pair(3)
  178. self.win.addstr(curline, 3 + len(host), text, attr)
  179. return self.alert(callback, "Remote Control Command Replies")
  180. def readline(self, x, y):
  181. buffer = str()
  182. curses.echo()
  183. try:
  184. i = 0
  185. while True:
  186. ch = self.win.getch(x, y + i)
  187. if ch != -1:
  188. if ch in (10, curses.KEY_ENTER): # enter
  189. break
  190. if ch in (27, ):
  191. buffer = str()
  192. break
  193. buffer += chr(ch)
  194. i += 1
  195. finally:
  196. curses.noecho()
  197. return buffer
  198. def revoke_selection(self):
  199. if not self.selected_task:
  200. return curses.beep()
  201. reply = self.app.control.revoke(self.selected_task, reply=True)
  202. self.alert_remote_control_reply(reply)
  203. def selection_info(self):
  204. if not self.selected_task:
  205. return
  206. def alert_callback(mx, my, xs):
  207. my, mx = self.win.getmaxyx()
  208. y = count(xs).next
  209. task = self.state.tasks[self.selected_task]
  210. info = task.info(extra=["state"])
  211. infoitems = [("args", info.pop("args", None)),
  212. ("kwargs", info.pop("kwargs", None))] + info.items()
  213. for key, value in infoitems:
  214. if key is None:
  215. continue
  216. value = str(value)
  217. curline = y()
  218. keys = key + ": "
  219. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  220. wrapped = wrap(value, mx - 2)
  221. if len(wrapped) == 1:
  222. self.win.addstr(curline, len(keys) + 3,
  223. abbr(wrapped[0],
  224. self.screen_width - (len(keys) + 3)))
  225. else:
  226. for subline in wrapped:
  227. nexty = y()
  228. if nexty >= my - 1:
  229. subline = " " * 4 + "[...]"
  230. elif nexty >= my:
  231. break
  232. self.win.addstr(nexty, 3,
  233. abbr(" " * 4 + subline, self.screen_width - 4),
  234. curses.A_NORMAL)
  235. return self.alert(alert_callback,
  236. "Task details for %s" % self.selected_task)
  237. def selection_traceback(self):
  238. if not self.selected_task:
  239. return curses.beep()
  240. task = self.state.tasks[self.selected_task]
  241. if task.state not in states.EXCEPTION_STATES:
  242. return curses.beep()
  243. def alert_callback(my, mx, xs):
  244. y = count(xs).next
  245. for line in task.traceback.split("\n"):
  246. self.win.addstr(y(), 3, line)
  247. return self.alert(alert_callback,
  248. "Task Exception Traceback for %s" % self.selected_task)
  249. def selection_result(self):
  250. if not self.selected_task:
  251. return
  252. def alert_callback(my, mx, xs):
  253. y = count(xs).next
  254. task = self.state.tasks[self.selected_task]
  255. result = getattr(task, "result", None) or getattr(task,
  256. "exception", None)
  257. for line in wrap(result, mx - 2):
  258. self.win.addstr(y(), 3, line)
  259. return self.alert(alert_callback,
  260. "Task Result for %s" % self.selected_task)
  261. def display_task_row(self, lineno, task):
  262. state_color = self.state_colors.get(task.state)
  263. attr = curses.A_NORMAL
  264. if task.uuid == self.selected_task:
  265. attr = curses.A_STANDOUT
  266. timestamp = datetime.utcfromtimestamp(
  267. task.timestamp or time.time())
  268. timef = timestamp.strftime("%H:%M:%S")
  269. hostname = task.worker.hostname if task.worker else '*NONE*'
  270. line = self.format_row(task.uuid, task.name,
  271. hostname,
  272. timef, task.state)
  273. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  274. if state_color:
  275. self.win.addstr(lineno,
  276. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  277. task.state, state_color | attr)
  278. def draw(self):
  279. win = self.win
  280. self.handle_keypress()
  281. x = LEFT_BORDER_OFFSET
  282. y = blank_line = count(2).next
  283. my, mx = win.getmaxyx()
  284. win.erase()
  285. win.bkgd(" ", curses.color_pair(1))
  286. win.border()
  287. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  288. blank_line()
  289. win.addstr(y(), x, self.format_row("UUID", "TASK",
  290. "WORKER", "TIME", "STATE"),
  291. curses.A_BOLD | curses.A_UNDERLINE)
  292. tasks = self.tasks
  293. if tasks:
  294. for row, (uuid, task) in enumerate(tasks):
  295. if row > self.display_height:
  296. break
  297. if task.uuid:
  298. lineno = y()
  299. self.display_task_row(lineno, task)
  300. # -- Footer
  301. blank_line()
  302. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  303. # Selected Task Info
  304. if self.selected_task:
  305. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  306. info = "Missing extended info"
  307. detail = ""
  308. try:
  309. selection = self.state.tasks[self.selected_task]
  310. except KeyError:
  311. pass
  312. else:
  313. info = selection.info(["args", "kwargs",
  314. "result", "runtime", "eta"])
  315. if "runtime" in info:
  316. info["runtime"] = "%.2fs" % info["runtime"]
  317. if "result" in info:
  318. info["result"] = abbr(info["result"], 16)
  319. info = " ".join("%s=%s" % (key, value)
  320. for key, value in info.items())
  321. detail = "... -> key i"
  322. infowin = abbr(info,
  323. self.screen_width - len(self.selected_str) - 2,
  324. detail)
  325. win.addstr(my - 5, x + len(self.selected_str), infowin)
  326. # Make ellipsis bold
  327. if detail in infowin:
  328. detailpos = len(infowin) - len(detail)
  329. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  330. detail, curses.A_BOLD)
  331. else:
  332. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  333. # Workers
  334. if self.workers:
  335. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  336. win.addstr(my - 4, x + len(self.online_str),
  337. ", ".join(sorted(self.workers)), curses.A_NORMAL)
  338. else:
  339. win.addstr(my - 4, x, "No workers discovered.")
  340. # Info
  341. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  342. win.addstr(my - 3, x + len(self.info_str),
  343. "events:%s tasks:%s workers:%s/%s" % (
  344. self.state.event_count, self.state.task_count,
  345. len([w for w in self.state.workers.values()
  346. if w.alive]),
  347. len(self.state.workers)),
  348. curses.A_DIM)
  349. # Help
  350. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  351. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  352. curses.A_DIM)
  353. win.refresh()
  354. def safe_add_str(self, y, x, string, *args, **kwargs):
  355. if x + len(string) > self.screen_width:
  356. string = string[:self.screen_width - x]
  357. self.win.addstr(y, x, string, *args, **kwargs)
  358. def init_screen(self):
  359. self.win = curses.initscr()
  360. self.win.nodelay(True)
  361. self.win.keypad(True)
  362. curses.start_color()
  363. curses.init_pair(1, self.foreground, self.background)
  364. # exception states
  365. curses.init_pair(2, curses.COLOR_RED, self.background)
  366. # successful state
  367. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  368. # revoked state
  369. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  370. # greeting
  371. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  372. # started state
  373. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  374. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  375. states.REVOKED: curses.color_pair(4),
  376. states.STARTED: curses.color_pair(6)}
  377. for state in states.EXCEPTION_STATES:
  378. self.state_colors[state] = curses.color_pair(2)
  379. curses.cbreak()
  380. def resetscreen(self):
  381. curses.nocbreak()
  382. self.win.keypad(False)
  383. curses.echo()
  384. curses.endwin()
  385. def nap(self):
  386. curses.napms(self.screen_delay)
  387. @property
  388. def tasks(self):
  389. return self.state.tasks_by_timestamp()[:self.limit]
  390. @property
  391. def workers(self):
  392. return [hostname
  393. for hostname, w in self.state.workers.items()
  394. if w.alive]
  395. class DisplayThread(threading.Thread):
  396. def __init__(self, display):
  397. self.display = display
  398. self.shutdown = False
  399. threading.Thread.__init__(self)
  400. def run(self):
  401. while not self.shutdown:
  402. self.display.draw()
  403. self.display.nap()
  404. def evtop(app=None):
  405. sys.stderr.write("-> evtop: starting capture...\n")
  406. app = app_or_default(app)
  407. state = app.events.State()
  408. conn = app.broker_connection()
  409. recv = app.events.Receiver(conn, handlers={"*": state.event})
  410. capture = recv.itercapture()
  411. capture.next()
  412. display = CursesMonitor(state, app=app)
  413. display.init_screen()
  414. refresher = DisplayThread(display)
  415. refresher.start()
  416. try:
  417. capture.next()
  418. except Exception:
  419. refresher.shutdown = True
  420. refresher.join()
  421. display.resetscreen()
  422. raise
  423. except (KeyboardInterrupt, SystemExit):
  424. conn and conn.close()
  425. refresher.shutdown = True
  426. refresher.join()
  427. display.resetscreen()
  428. if __name__ == "__main__":
  429. evtop()