cursesmon.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.cursesmon
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Graphical monitor of Celery events using curses.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import curses
  12. import sys
  13. import threading
  14. import time
  15. from datetime import datetime
  16. from itertools import count
  17. from textwrap import wrap
  18. from math import ceil
  19. from .. import __version__
  20. from .. import states
  21. from ..app import app_or_default
  22. from ..utils import abbr, abbrtask
  23. BORDER_SPACING = 4
  24. LEFT_BORDER_OFFSET = 3
  25. UUID_WIDTH = 36
  26. STATE_WIDTH = 8
  27. TIMESTAMP_WIDTH = 8
  28. MIN_WORKER_WIDTH = 15
  29. MIN_TASK_WIDTH = 16
  30. class CursesMonitor(object):
  31. keymap = {}
  32. win = None
  33. screen_width = None
  34. screen_delay = 10
  35. selected_task = None
  36. selected_position = 0
  37. selected_str = "Selected: "
  38. foreground = curses.COLOR_BLACK
  39. background = curses.COLOR_WHITE
  40. online_str = "Workers online: "
  41. help_title = "Keys: "
  42. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  43. greet = "celeryev %s" % __version__
  44. info_str = "Info: "
  45. def __init__(self, state, keymap=None, app=None):
  46. self.app = app_or_default(app)
  47. self.keymap = keymap or self.keymap
  48. self.state = state
  49. default_keymap = {"J": self.move_selection_down,
  50. "K": self.move_selection_up,
  51. "C": self.revoke_selection,
  52. "T": self.selection_traceback,
  53. "R": self.selection_result,
  54. "I": self.selection_info,
  55. "L": self.selection_rate_limit}
  56. self.keymap = dict(default_keymap, **self.keymap)
  57. def format_row(self, uuid, task, worker, timestamp, state):
  58. mx = self.display_width
  59. # include spacing
  60. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  61. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  62. if uuid_space < UUID_WIDTH:
  63. uuid_width = uuid_space
  64. else:
  65. uuid_width = UUID_WIDTH
  66. detail_width = detail_width - uuid_width - 1
  67. task_width = int(ceil(detail_width / 2.0))
  68. worker_width = detail_width - task_width - 1
  69. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  70. worker = abbr(worker, worker_width).ljust(worker_width)
  71. task = abbrtask(task, task_width).ljust(task_width)
  72. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  73. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  74. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  75. if self.screen_width is None:
  76. self.screen_width = len(row[:mx])
  77. return row[:mx]
  78. @property
  79. def screen_width(self):
  80. _, mx = self.win.getmaxyx()
  81. return mx
  82. @property
  83. def screen_height(self):
  84. my, _ = self.win.getmaxyx()
  85. return my
  86. @property
  87. def display_width(self):
  88. _, mx = self.win.getmaxyx()
  89. return mx - BORDER_SPACING
  90. @property
  91. def display_height(self):
  92. my, _ = self.win.getmaxyx()
  93. return my - 10
  94. @property
  95. def limit(self):
  96. return self.display_height
  97. def find_position(self):
  98. if not self.tasks:
  99. return 0
  100. for i, e in enumerate(self.tasks):
  101. if self.selected_task == e[0]:
  102. return i
  103. return 0
  104. def move_selection_up(self):
  105. self.move_selection(-1)
  106. def move_selection_down(self):
  107. self.move_selection(1)
  108. def move_selection(self, direction=1):
  109. if not self.tasks:
  110. return
  111. pos = self.find_position()
  112. try:
  113. self.selected_task = self.tasks[pos + direction][0]
  114. except IndexError:
  115. self.selected_task = self.tasks[0][0]
  116. keyalias = {curses.KEY_DOWN: "J",
  117. curses.KEY_UP: "K",
  118. curses.KEY_ENTER: "I"}
  119. def handle_keypress(self):
  120. try:
  121. key = self.win.getkey().upper()
  122. except:
  123. return
  124. key = self.keyalias.get(key) or key
  125. handler = self.keymap.get(key)
  126. if handler is not None:
  127. handler()
  128. def alert(self, callback, title=None):
  129. self.win.erase()
  130. my, mx = self.win.getmaxyx()
  131. y = blank_line = count(2).next
  132. if title:
  133. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  134. blank_line()
  135. callback(my, mx, y())
  136. self.win.addstr(my - 1, 0, "Press any key to continue...",
  137. curses.A_BOLD)
  138. self.win.refresh()
  139. while 1:
  140. try:
  141. return self.win.getkey().upper()
  142. except:
  143. pass
  144. def selection_rate_limit(self):
  145. if not self.selected_task:
  146. return curses.beep()
  147. task = self.state.tasks[self.selected_task]
  148. if not task.name:
  149. return curses.beep()
  150. my, mx = self.win.getmaxyx()
  151. r = "New rate limit: "
  152. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  153. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  154. rlimit = self.readline(my - 2, 3 + len(r))
  155. if rlimit:
  156. reply = self.app.control.rate_limit(task.name,
  157. rlimit.strip(), reply=True)
  158. self.alert_remote_control_reply(reply)
  159. def alert_remote_control_reply(self, reply):
  160. def callback(my, mx, xs):
  161. y = count(xs).next
  162. if not reply:
  163. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  164. curses.A_BOLD + curses.color_pair(2))
  165. return
  166. for subreply in reply:
  167. curline = y()
  168. host, response = subreply.items()[0]
  169. host = "%s: " % host
  170. self.win.addstr(curline, 3, host, curses.A_BOLD)
  171. attr = curses.A_NORMAL
  172. text = ""
  173. if "error" in response:
  174. text = response["error"]
  175. attr |= curses.color_pair(2)
  176. elif "ok" in response:
  177. text = response["ok"]
  178. attr |= curses.color_pair(3)
  179. self.win.addstr(curline, 3 + len(host), text, attr)
  180. return self.alert(callback, "Remote Control Command Replies")
  181. def readline(self, x, y):
  182. buffer = str()
  183. curses.echo()
  184. try:
  185. i = 0
  186. while True:
  187. ch = self.win.getch(x, y + i)
  188. if ch != -1:
  189. if ch in (10, curses.KEY_ENTER): # enter
  190. break
  191. if ch in (27, ):
  192. buffer = str()
  193. break
  194. buffer += chr(ch)
  195. i += 1
  196. finally:
  197. curses.noecho()
  198. return buffer
  199. def revoke_selection(self):
  200. if not self.selected_task:
  201. return curses.beep()
  202. reply = self.app.control.revoke(self.selected_task, reply=True)
  203. self.alert_remote_control_reply(reply)
  204. def selection_info(self):
  205. if not self.selected_task:
  206. return
  207. def alert_callback(mx, my, xs):
  208. my, mx = self.win.getmaxyx()
  209. y = count(xs).next
  210. task = self.state.tasks[self.selected_task]
  211. info = task.info(extra=["state"])
  212. infoitems = [("args", info.pop("args", None)),
  213. ("kwargs", info.pop("kwargs", None))] + info.items()
  214. for key, value in infoitems:
  215. if key is None:
  216. continue
  217. value = str(value)
  218. curline = y()
  219. keys = key + ": "
  220. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  221. wrapped = wrap(value, mx - 2)
  222. if len(wrapped) == 1:
  223. self.win.addstr(curline, len(keys) + 3,
  224. abbr(wrapped[0],
  225. self.screen_width - (len(keys) + 3)))
  226. else:
  227. for subline in wrapped:
  228. nexty = y()
  229. if nexty >= my - 1:
  230. subline = " " * 4 + "[...]"
  231. elif nexty >= my:
  232. break
  233. self.win.addstr(nexty, 3,
  234. abbr(" " * 4 + subline, self.screen_width - 4),
  235. curses.A_NORMAL)
  236. return self.alert(alert_callback,
  237. "Task details for %s" % self.selected_task)
  238. def selection_traceback(self):
  239. if not self.selected_task:
  240. return curses.beep()
  241. task = self.state.tasks[self.selected_task]
  242. if task.state not in states.EXCEPTION_STATES:
  243. return curses.beep()
  244. def alert_callback(my, mx, xs):
  245. y = count(xs).next
  246. for line in task.traceback.split("\n"):
  247. self.win.addstr(y(), 3, line)
  248. return self.alert(alert_callback,
  249. "Task Exception Traceback for %s" % self.selected_task)
  250. def selection_result(self):
  251. if not self.selected_task:
  252. return
  253. def alert_callback(my, mx, xs):
  254. y = count(xs).next
  255. task = self.state.tasks[self.selected_task]
  256. result = getattr(task, "result", None) or getattr(task,
  257. "exception", None)
  258. for line in wrap(result, mx - 2):
  259. self.win.addstr(y(), 3, line)
  260. return self.alert(alert_callback,
  261. "Task Result for %s" % self.selected_task)
  262. def display_task_row(self, lineno, task):
  263. state_color = self.state_colors.get(task.state)
  264. attr = curses.A_NORMAL
  265. if task.uuid == self.selected_task:
  266. attr = curses.A_STANDOUT
  267. timestamp = datetime.utcfromtimestamp(
  268. task.timestamp or time.time())
  269. timef = timestamp.strftime("%H:%M:%S")
  270. hostname = task.worker.hostname if task.worker else '*NONE*'
  271. line = self.format_row(task.uuid, task.name,
  272. hostname,
  273. timef, task.state)
  274. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  275. if state_color:
  276. self.win.addstr(lineno,
  277. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  278. task.state, state_color | attr)
  279. def draw(self):
  280. win = self.win
  281. self.handle_keypress()
  282. x = LEFT_BORDER_OFFSET
  283. y = blank_line = count(2).next
  284. my, mx = win.getmaxyx()
  285. win.erase()
  286. win.bkgd(" ", curses.color_pair(1))
  287. win.border()
  288. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  289. blank_line()
  290. win.addstr(y(), x, self.format_row("UUID", "TASK",
  291. "WORKER", "TIME", "STATE"),
  292. curses.A_BOLD | curses.A_UNDERLINE)
  293. tasks = self.tasks
  294. if tasks:
  295. for row, (uuid, task) in enumerate(tasks):
  296. if row > self.display_height:
  297. break
  298. if task.uuid:
  299. lineno = y()
  300. self.display_task_row(lineno, task)
  301. # -- Footer
  302. blank_line()
  303. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  304. # Selected Task Info
  305. if self.selected_task:
  306. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  307. info = "Missing extended info"
  308. detail = ""
  309. try:
  310. selection = self.state.tasks[self.selected_task]
  311. except KeyError:
  312. pass
  313. else:
  314. info = selection.info(["args", "kwargs",
  315. "result", "runtime", "eta"])
  316. if "runtime" in info:
  317. info["runtime"] = "%.2fs" % info["runtime"]
  318. if "result" in info:
  319. info["result"] = abbr(info["result"], 16)
  320. info = " ".join("%s=%s" % (key, value)
  321. for key, value in info.items())
  322. detail = "... -> key i"
  323. infowin = abbr(info,
  324. self.screen_width - len(self.selected_str) - 2,
  325. detail)
  326. win.addstr(my - 5, x + len(self.selected_str), infowin)
  327. # Make ellipsis bold
  328. if detail in infowin:
  329. detailpos = len(infowin) - len(detail)
  330. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  331. detail, curses.A_BOLD)
  332. else:
  333. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  334. # Workers
  335. if self.workers:
  336. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  337. win.addstr(my - 4, x + len(self.online_str),
  338. ", ".join(sorted(self.workers)), curses.A_NORMAL)
  339. else:
  340. win.addstr(my - 4, x, "No workers discovered.")
  341. # Info
  342. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  343. win.addstr(my - 3, x + len(self.info_str),
  344. "events:%s tasks:%s workers:%s/%s" % (
  345. self.state.event_count, self.state.task_count,
  346. len([w for w in self.state.workers.values()
  347. if w.alive]),
  348. len(self.state.workers)),
  349. curses.A_DIM)
  350. # Help
  351. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  352. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  353. curses.A_DIM)
  354. win.refresh()
  355. def safe_add_str(self, y, x, string, *args, **kwargs):
  356. if x + len(string) > self.screen_width:
  357. string = string[:self.screen_width - x]
  358. self.win.addstr(y, x, string, *args, **kwargs)
  359. def init_screen(self):
  360. self.win = curses.initscr()
  361. self.win.nodelay(True)
  362. self.win.keypad(True)
  363. curses.start_color()
  364. curses.init_pair(1, self.foreground, self.background)
  365. # exception states
  366. curses.init_pair(2, curses.COLOR_RED, self.background)
  367. # successful state
  368. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  369. # revoked state
  370. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  371. # greeting
  372. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  373. # started state
  374. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  375. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  376. states.REVOKED: curses.color_pair(4),
  377. states.STARTED: curses.color_pair(6)}
  378. for state in states.EXCEPTION_STATES:
  379. self.state_colors[state] = curses.color_pair(2)
  380. curses.cbreak()
  381. def resetscreen(self):
  382. curses.nocbreak()
  383. self.win.keypad(False)
  384. curses.echo()
  385. curses.endwin()
  386. def nap(self):
  387. curses.napms(self.screen_delay)
  388. @property
  389. def tasks(self):
  390. return self.state.tasks_by_timestamp()[:self.limit]
  391. @property
  392. def workers(self):
  393. return [hostname
  394. for hostname, w in self.state.workers.items()
  395. if w.alive]
  396. class DisplayThread(threading.Thread):
  397. def __init__(self, display):
  398. self.display = display
  399. self.shutdown = False
  400. threading.Thread.__init__(self)
  401. def run(self):
  402. while not self.shutdown:
  403. self.display.draw()
  404. self.display.nap()
  405. def capture_events(app, state, display):
  406. def on_connection_error(exc, interval):
  407. sys.stderr.write("Connection Error: %r. Retry in %ss." % (
  408. exc, interval))
  409. while 1:
  410. sys.stderr.write("-> evtop: starting capture...\n")
  411. with app.broker_connection() as conn:
  412. try:
  413. conn.ensure_connection(on_connection_error,
  414. app.conf.BROKER_CONNECTION_MAX_RETRIES)
  415. recv = app.events.Receiver(conn, handlers={"*": state.event})
  416. display.resetscreen()
  417. display.init_screen()
  418. with recv.consumer():
  419. recv.drain_events(timeout=1, ignore_timeouts=True)
  420. except (conn.connection_errors, conn.channel_errors), exc:
  421. sys.stderr.write("Connection lost: %r" % (exc, ))
  422. def evtop(app=None):
  423. app = app_or_default(app)
  424. state = app.events.State()
  425. display = CursesMonitor(state, app=app)
  426. display.init_screen()
  427. refresher = DisplayThread(display)
  428. refresher.start()
  429. try:
  430. capture_events(app, state, display)
  431. except Exception:
  432. refresher.shutdown = True
  433. refresher.join()
  434. display.resetscreen()
  435. raise
  436. except (KeyboardInterrupt, SystemExit):
  437. refresher.shutdown = True
  438. refresher.join()
  439. display.resetscreen()
  440. if __name__ == "__main__":
  441. evtop()