cursesmon.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. # -*- coding: utf-8 -*-
  2. """Graphical monitor of Celery events using curses."""
  3. import curses
  4. import sys
  5. import threading
  6. from datetime import datetime
  7. from itertools import count
  8. from textwrap import wrap
  9. from time import time
  10. from math import ceil
  11. from celery import VERSION_BANNER
  12. from celery import states
  13. from celery.app import app_or_default
  14. from celery.utils.text import abbr, abbrtask
  15. __all__ = ['CursesMonitor', 'evtop']
  16. BORDER_SPACING = 4
  17. LEFT_BORDER_OFFSET = 3
  18. UUID_WIDTH = 36
  19. STATE_WIDTH = 8
  20. TIMESTAMP_WIDTH = 8
  21. MIN_WORKER_WIDTH = 15
  22. MIN_TASK_WIDTH = 16
  23. # this module is considered experimental
  24. # we don't care about coverage.
  25. STATUS_SCREEN = """\
  26. events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}
  27. """
  28. class CursesMonitor: # pragma: no cover
  29. keymap = {}
  30. win = None
  31. screen_width = None
  32. screen_delay = 10
  33. selected_task = None
  34. selected_position = 0
  35. selected_str = 'Selected: '
  36. foreground = curses.COLOR_BLACK
  37. background = curses.COLOR_WHITE
  38. online_str = 'Workers online: '
  39. help_title = 'Keys: '
  40. help = ('j:down k:up i:info t:traceback r:result c:revoke ^c: quit')
  41. greet = 'celery events {0}'.format(VERSION_BANNER)
  42. info_str = 'Info: '
  43. def __init__(self, state, app, keymap=None):
  44. self.app = app
  45. self.keymap = keymap or self.keymap
  46. self.state = state
  47. default_keymap = {
  48. 'J': self.move_selection_down,
  49. 'K': self.move_selection_up,
  50. 'C': self.revoke_selection,
  51. 'T': self.selection_traceback,
  52. 'R': self.selection_result,
  53. 'I': self.selection_info,
  54. 'L': self.selection_rate_limit,
  55. }
  56. self.keymap = dict(default_keymap, **self.keymap)
  57. self.lock = threading.RLock()
  58. def format_row(self, uuid, task, worker, timestamp, state):
  59. mx = self.display_width
  60. # include spacing
  61. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  62. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  63. if uuid_space < UUID_WIDTH:
  64. uuid_width = uuid_space
  65. else:
  66. uuid_width = UUID_WIDTH
  67. detail_width = detail_width - uuid_width - 1
  68. task_width = int(ceil(detail_width / 2.0))
  69. worker_width = detail_width - task_width - 1
  70. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  71. worker = abbr(worker, worker_width).ljust(worker_width)
  72. task = abbrtask(task, task_width).ljust(task_width)
  73. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  74. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  75. row = '{0} {1} {2} {3} {4} '.format(uuid, worker, task,
  76. timestamp, state)
  77. if self.screen_width is None:
  78. self.screen_width = len(row[:mx])
  79. return row[:mx]
  80. @property
  81. def screen_width(self):
  82. _, mx = self.win.getmaxyx()
  83. return mx
  84. @property
  85. def screen_height(self):
  86. my, _ = self.win.getmaxyx()
  87. return my
  88. @property
  89. def display_width(self):
  90. _, mx = self.win.getmaxyx()
  91. return mx - BORDER_SPACING
  92. @property
  93. def display_height(self):
  94. my, _ = self.win.getmaxyx()
  95. return my - 10
  96. @property
  97. def limit(self):
  98. return self.display_height
  99. def find_position(self):
  100. if not self.tasks:
  101. return 0
  102. for i, e in enumerate(self.tasks):
  103. if self.selected_task == e[0]:
  104. return i
  105. return 0
  106. def move_selection_up(self):
  107. self.move_selection(-1)
  108. def move_selection_down(self):
  109. self.move_selection(1)
  110. def move_selection(self, direction=1):
  111. if not self.tasks:
  112. return
  113. pos = self.find_position()
  114. try:
  115. self.selected_task = self.tasks[pos + direction][0]
  116. except IndexError:
  117. self.selected_task = self.tasks[0][0]
  118. keyalias = {curses.KEY_DOWN: 'J',
  119. curses.KEY_UP: 'K',
  120. curses.KEY_ENTER: 'I'}
  121. def handle_keypress(self):
  122. try:
  123. key = self.win.getkey().upper()
  124. except:
  125. return
  126. key = self.keyalias.get(key) or key
  127. handler = self.keymap.get(key)
  128. if handler is not None:
  129. handler()
  130. def alert(self, callback, title=None):
  131. self.win.erase()
  132. my, mx = self.win.getmaxyx()
  133. y = blank_line = count(2)
  134. if title:
  135. self.win.addstr(next(y), 3, title,
  136. curses.A_BOLD | curses.A_UNDERLINE)
  137. next(blank_line)
  138. callback(my, mx, next(y))
  139. self.win.addstr(my - 1, 0, 'Press any key to continue...',
  140. curses.A_BOLD)
  141. self.win.refresh()
  142. while 1:
  143. try:
  144. return self.win.getkey().upper()
  145. except:
  146. pass
  147. def selection_rate_limit(self):
  148. if not self.selected_task:
  149. return curses.beep()
  150. task = self.state.tasks[self.selected_task]
  151. if not task.name:
  152. return curses.beep()
  153. my, mx = self.win.getmaxyx()
  154. r = 'New rate limit: '
  155. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  156. self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))
  157. rlimit = self.readline(my - 2, 3 + len(r))
  158. if rlimit:
  159. reply = self.app.control.rate_limit(task.name,
  160. rlimit.strip(), reply=True)
  161. self.alert_remote_control_reply(reply)
  162. def alert_remote_control_reply(self, reply):
  163. def callback(my, mx, xs):
  164. y = count(xs)
  165. if not reply:
  166. self.win.addstr(
  167. next(y), 3, 'No replies received in 1s deadline.',
  168. curses.A_BOLD + curses.color_pair(2),
  169. )
  170. return
  171. for subreply in reply:
  172. curline = next(y)
  173. host, response = next(subreply.items())
  174. host = '{0}: '.format(host)
  175. self.win.addstr(curline, 3, host, curses.A_BOLD)
  176. attr = curses.A_NORMAL
  177. text = ''
  178. if 'error' in response:
  179. text = response['error']
  180. attr |= curses.color_pair(2)
  181. elif 'ok' in response:
  182. text = response['ok']
  183. attr |= curses.color_pair(3)
  184. self.win.addstr(curline, 3 + len(host), text, attr)
  185. return self.alert(callback, 'Remote Control Command Replies')
  186. def readline(self, x, y):
  187. buffer = str()
  188. curses.echo()
  189. try:
  190. i = 0
  191. while 1:
  192. ch = self.win.getch(x, y + i)
  193. if ch != -1:
  194. if ch in (10, curses.KEY_ENTER): # enter
  195. break
  196. if ch in (27,):
  197. buffer = str()
  198. break
  199. buffer += chr(ch)
  200. i += 1
  201. finally:
  202. curses.noecho()
  203. return buffer
  204. def revoke_selection(self):
  205. if not self.selected_task:
  206. return curses.beep()
  207. reply = self.app.control.revoke(self.selected_task, reply=True)
  208. self.alert_remote_control_reply(reply)
  209. def selection_info(self):
  210. if not self.selected_task:
  211. return
  212. def alert_callback(mx, my, xs):
  213. my, mx = self.win.getmaxyx()
  214. y = count(xs)
  215. task = self.state.tasks[self.selected_task]
  216. info = task.info(extra=['state'])
  217. infoitems = [
  218. ('args', info.pop('args', None)),
  219. ('kwargs', info.pop('kwargs', None))
  220. ] + list(info.items())
  221. for key, value in infoitems:
  222. if key is None:
  223. continue
  224. value = str(value)
  225. curline = next(y)
  226. keys = key + ': '
  227. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  228. wrapped = wrap(value, mx - 2)
  229. if len(wrapped) == 1:
  230. self.win.addstr(
  231. curline, len(keys) + 3,
  232. abbr(wrapped[0],
  233. self.screen_width - (len(keys) + 3)))
  234. else:
  235. for subline in wrapped:
  236. nexty = next(y)
  237. if nexty >= my - 1:
  238. subline = ' ' * 4 + '[...]'
  239. elif nexty >= my:
  240. break
  241. self.win.addstr(
  242. nexty, 3,
  243. abbr(' ' * 4 + subline, self.screen_width - 4),
  244. curses.A_NORMAL,
  245. )
  246. return self.alert(
  247. alert_callback, 'Task details for {0.selected_task}'.format(self),
  248. )
  249. def selection_traceback(self):
  250. if not self.selected_task:
  251. return curses.beep()
  252. task = self.state.tasks[self.selected_task]
  253. if task.state not in states.EXCEPTION_STATES:
  254. return curses.beep()
  255. def alert_callback(my, mx, xs):
  256. y = count(xs)
  257. for line in task.traceback.split('\n'):
  258. self.win.addstr(next(y), 3, line)
  259. return self.alert(
  260. alert_callback,
  261. 'Task Exception Traceback for {0.selected_task}'.format(self),
  262. )
  263. def selection_result(self):
  264. if not self.selected_task:
  265. return
  266. def alert_callback(my, mx, xs):
  267. y = count(xs)
  268. task = self.state.tasks[self.selected_task]
  269. result = (getattr(task, 'result', None) or
  270. getattr(task, 'exception', None))
  271. for line in wrap(result or '', mx - 2):
  272. self.win.addstr(next(y), 3, line)
  273. return self.alert(
  274. alert_callback,
  275. 'Task Result for {0.selected_task}'.format(self),
  276. )
  277. def display_task_row(self, lineno, task):
  278. state_color = self.state_colors.get(task.state)
  279. attr = curses.A_NORMAL
  280. if task.uuid == self.selected_task:
  281. attr = curses.A_STANDOUT
  282. timestamp = datetime.utcfromtimestamp(
  283. task.timestamp or time(),
  284. )
  285. timef = timestamp.strftime('%H:%M:%S')
  286. hostname = task.worker.hostname if task.worker else '*NONE*'
  287. line = self.format_row(task.uuid, task.name,
  288. hostname,
  289. timef, task.state)
  290. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  291. if state_color:
  292. self.win.addstr(lineno,
  293. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  294. task.state, state_color | attr)
  295. def draw(self):
  296. with self.lock:
  297. win = self.win
  298. self.handle_keypress()
  299. x = LEFT_BORDER_OFFSET
  300. y = blank_line = count(2)
  301. my, mx = win.getmaxyx()
  302. win.erase()
  303. win.bkgd(' ', curses.color_pair(1))
  304. win.border()
  305. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  306. next(blank_line)
  307. win.addstr(next(y), x, self.format_row('UUID', 'TASK',
  308. 'WORKER', 'TIME', 'STATE'),
  309. curses.A_BOLD | curses.A_UNDERLINE)
  310. tasks = self.tasks
  311. if tasks:
  312. for row, (uuid, task) in enumerate(tasks):
  313. if row > self.display_height:
  314. break
  315. if task.uuid:
  316. lineno = next(y)
  317. self.display_task_row(lineno, task)
  318. # -- Footer
  319. next(blank_line)
  320. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  321. # Selected Task Info
  322. if self.selected_task:
  323. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  324. info = 'Missing extended info'
  325. detail = ''
  326. try:
  327. selection = self.state.tasks[self.selected_task]
  328. except KeyError:
  329. pass
  330. else:
  331. info = selection.info()
  332. if 'runtime' in info:
  333. info['runtime'] = '{0:.2f}'.format(info['runtime'])
  334. if 'result' in info:
  335. info['result'] = abbr(info['result'], 16)
  336. info = ' '.join(
  337. '{0}={1}'.format(key, value)
  338. for key, value in info.items()
  339. )
  340. detail = '... -> key i'
  341. infowin = abbr(info,
  342. self.screen_width - len(self.selected_str) - 2,
  343. detail)
  344. win.addstr(my - 5, x + len(self.selected_str), infowin)
  345. # Make ellipsis bold
  346. if detail in infowin:
  347. detailpos = len(infowin) - len(detail)
  348. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  349. detail, curses.A_BOLD)
  350. else:
  351. win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL)
  352. # Workers
  353. if self.workers:
  354. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  355. win.addstr(my - 4, x + len(self.online_str),
  356. ', '.join(sorted(self.workers)), curses.A_NORMAL)
  357. else:
  358. win.addstr(my - 4, x, 'No workers discovered.')
  359. # Info
  360. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  361. win.addstr(
  362. my - 3, x + len(self.info_str),
  363. STATUS_SCREEN.format(
  364. s=self.state,
  365. w_alive=len([
  366. w for w in self.state.workers.values()
  367. if w.alive
  368. ]),
  369. w_all=len(self.state.workers),
  370. ),
  371. curses.A_DIM,
  372. )
  373. # Help
  374. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  375. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  376. curses.A_DIM)
  377. win.refresh()
  378. def safe_add_str(self, y, x, string, *args, **kwargs):
  379. if x + len(string) > self.screen_width:
  380. string = string[:self.screen_width - x]
  381. self.win.addstr(y, x, string, *args, **kwargs)
  382. def init_screen(self):
  383. with self.lock:
  384. self.win = curses.initscr()
  385. self.win.nodelay(True)
  386. self.win.keypad(True)
  387. curses.start_color()
  388. curses.init_pair(1, self.foreground, self.background)
  389. # exception states
  390. curses.init_pair(2, curses.COLOR_RED, self.background)
  391. # successful state
  392. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  393. # revoked state
  394. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  395. # greeting
  396. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  397. # started state
  398. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  399. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  400. states.REVOKED: curses.color_pair(4),
  401. states.STARTED: curses.color_pair(6)}
  402. for state in states.EXCEPTION_STATES:
  403. self.state_colors[state] = curses.color_pair(2)
  404. curses.cbreak()
  405. def resetscreen(self):
  406. with self.lock:
  407. curses.nocbreak()
  408. self.win.keypad(False)
  409. curses.echo()
  410. curses.endwin()
  411. def nap(self):
  412. curses.napms(self.screen_delay)
  413. @property
  414. def tasks(self):
  415. return list(self.state.tasks_by_time(limit=self.limit))
  416. @property
  417. def workers(self):
  418. return [hostname for hostname, w in self.state.workers.items()
  419. if w.alive]
  420. class DisplayThread(threading.Thread): # pragma: no cover
  421. def __init__(self, display):
  422. self.display = display
  423. self.shutdown = False
  424. threading.Thread.__init__(self)
  425. def run(self):
  426. while not self.shutdown:
  427. self.display.draw()
  428. self.display.nap()
  429. def capture_events(app, state, display): # pragma: no cover
  430. def on_connection_error(exc, interval):
  431. print('Connection Error: {0!r}. Retry in {1}s.'.format(
  432. exc, interval), file=sys.stderr)
  433. while 1:
  434. print('-> evtop: starting capture...', file=sys.stderr)
  435. with app.connection_for_read() as conn:
  436. try:
  437. conn.ensure_connection(on_connection_error,
  438. app.conf.broker_connection_max_retries)
  439. recv = app.events.Receiver(conn, handlers={'*': state.event})
  440. display.resetscreen()
  441. display.init_screen()
  442. recv.capture()
  443. except conn.connection_errors + conn.channel_errors as exc:
  444. print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
  445. def evtop(app=None): # pragma: no cover
  446. app = app_or_default(app)
  447. state = app.events.State()
  448. display = CursesMonitor(state, app)
  449. display.init_screen()
  450. refresher = DisplayThread(display)
  451. refresher.start()
  452. try:
  453. capture_events(app, state, display)
  454. except Exception:
  455. refresher.shutdown = True
  456. refresher.join()
  457. display.resetscreen()
  458. raise
  459. except (KeyboardInterrupt, SystemExit):
  460. refresher.shutdown = True
  461. refresher.join()
  462. display.resetscreen()
  463. if __name__ == '__main__': # pragma: no cover
  464. evtop()