cursesmon.py 17 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.cursesmon
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Graphical monitor of Celery events using curses.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import curses
  9. import sys
  10. import threading
  11. import time
  12. from datetime import datetime
  13. from itertools import count
  14. from textwrap import wrap
  15. from math import ceil
  16. from celery import VERSION_BANNER
  17. from celery import states
  18. from celery.app import app_or_default
  19. from celery.five import items, values
  20. from celery.utils.text import abbr, abbrtask
  21. BORDER_SPACING = 4
  22. LEFT_BORDER_OFFSET = 3
  23. UUID_WIDTH = 36
  24. STATE_WIDTH = 8
  25. TIMESTAMP_WIDTH = 8
  26. MIN_WORKER_WIDTH = 15
  27. MIN_TASK_WIDTH = 16
  28. # this module is considered experimental
  29. # we don't care about coverage.
  30. STATUS_SCREEN = """\
  31. events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}
  32. """
  33. class CursesMonitor(object): # pragma: no cover
  34. keymap = {}
  35. win = None
  36. screen_width = None
  37. screen_delay = 10
  38. selected_task = None
  39. selected_position = 0
  40. selected_str = 'Selected: '
  41. foreground = curses.COLOR_BLACK
  42. background = curses.COLOR_WHITE
  43. online_str = 'Workers online: '
  44. help_title = 'Keys: '
  45. help = ('j:up k:down i:info t:traceback r:result c:revoke ^c: quit')
  46. greet = 'celery events {0}'.format(VERSION_BANNER)
  47. info_str = 'Info: '
  48. def __init__(self, state, keymap=None, app=None):
  49. self.app = app_or_default(app)
  50. self.keymap = keymap or self.keymap
  51. self.state = state
  52. default_keymap = {'J': self.move_selection_down,
  53. 'K': self.move_selection_up,
  54. 'C': self.revoke_selection,
  55. 'T': self.selection_traceback,
  56. 'R': self.selection_result,
  57. 'I': self.selection_info,
  58. 'L': self.selection_rate_limit}
  59. self.keymap = dict(default_keymap, **self.keymap)
  60. def format_row(self, uuid, task, worker, timestamp, state):
  61. mx = self.display_width
  62. # include spacing
  63. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  64. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  65. if uuid_space < UUID_WIDTH:
  66. uuid_width = uuid_space
  67. else:
  68. uuid_width = UUID_WIDTH
  69. detail_width = detail_width - uuid_width - 1
  70. task_width = int(ceil(detail_width / 2.0))
  71. worker_width = detail_width - task_width - 1
  72. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  73. worker = abbr(worker, worker_width).ljust(worker_width)
  74. task = abbrtask(task, task_width).ljust(task_width)
  75. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  76. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  77. row = '{0} {1} {2} {3} {4} '.format(uuid, worker, task,
  78. timestamp, state)
  79. if self.screen_width is None:
  80. self.screen_width = len(row[:mx])
  81. return row[:mx]
  82. @property
  83. def screen_width(self):
  84. _, mx = self.win.getmaxyx()
  85. return mx
  86. @property
  87. def screen_height(self):
  88. my, _ = self.win.getmaxyx()
  89. return my
  90. @property
  91. def display_width(self):
  92. _, mx = self.win.getmaxyx()
  93. return mx - BORDER_SPACING
  94. @property
  95. def display_height(self):
  96. my, _ = self.win.getmaxyx()
  97. return my - 10
  98. @property
  99. def limit(self):
  100. return self.display_height
  101. def find_position(self):
  102. if not self.tasks:
  103. return 0
  104. for i, e in enumerate(self.tasks):
  105. if self.selected_task == e[0]:
  106. return i
  107. return 0
  108. def move_selection_up(self):
  109. self.move_selection(-1)
  110. def move_selection_down(self):
  111. self.move_selection(1)
  112. def move_selection(self, direction=1):
  113. if not self.tasks:
  114. return
  115. pos = self.find_position()
  116. try:
  117. self.selected_task = self.tasks[pos + direction][0]
  118. except IndexError:
  119. self.selected_task = self.tasks[0][0]
  120. keyalias = {curses.KEY_DOWN: 'J',
  121. curses.KEY_UP: 'K',
  122. curses.KEY_ENTER: 'I'}
  123. def handle_keypress(self):
  124. try:
  125. key = self.win.getkey().upper()
  126. except:
  127. return
  128. key = self.keyalias.get(key) or key
  129. handler = self.keymap.get(key)
  130. if handler is not None:
  131. handler()
  132. def alert(self, callback, title=None):
  133. self.win.erase()
  134. my, mx = self.win.getmaxyx()
  135. y = blank_line = count(2)
  136. if title:
  137. self.win.addstr(next(y), 3, title,
  138. curses.A_BOLD | curses.A_UNDERLINE)
  139. next(blank_line)
  140. callback(my, mx, next(y))
  141. self.win.addstr(my - 1, 0, 'Press any key to continue...',
  142. curses.A_BOLD)
  143. self.win.refresh()
  144. while 1:
  145. try:
  146. return self.win.getkey().upper()
  147. except:
  148. pass
  149. def selection_rate_limit(self):
  150. if not self.selected_task:
  151. return curses.beep()
  152. task = self.state.tasks[self.selected_task]
  153. if not task.name:
  154. return curses.beep()
  155. my, mx = self.win.getmaxyx()
  156. r = 'New rate limit: '
  157. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  158. self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))
  159. rlimit = self.readline(my - 2, 3 + len(r))
  160. if rlimit:
  161. reply = self.app.control.rate_limit(task.name,
  162. rlimit.strip(), reply=True)
  163. self.alert_remote_control_reply(reply)
  164. def alert_remote_control_reply(self, reply):
  165. def callback(my, mx, xs):
  166. y = count(xs)
  167. if not reply:
  168. self.win.addstr(next(y), 3,
  169. 'No replies received in 1s deadline.',
  170. curses.A_BOLD + curses.color_pair(2))
  171. return
  172. for subreply in reply:
  173. curline = next(y)
  174. host, response = next(items(subreply))
  175. host = '{0}: '.format(host)
  176. self.win.addstr(curline, 3, host, curses.A_BOLD)
  177. attr = curses.A_NORMAL
  178. text = ''
  179. if 'error' in response:
  180. text = response['error']
  181. attr |= curses.color_pair(2)
  182. elif 'ok' in response:
  183. text = response['ok']
  184. attr |= curses.color_pair(3)
  185. self.win.addstr(curline, 3 + len(host), text, attr)
  186. return self.alert(callback, 'Remote Control Command Replies')
  187. def readline(self, x, y):
  188. buffer = str()
  189. curses.echo()
  190. try:
  191. i = 0
  192. while 1:
  193. ch = self.win.getch(x, y + i)
  194. if ch != -1:
  195. if ch in (10, curses.KEY_ENTER): # enter
  196. break
  197. if ch in (27, ):
  198. buffer = str()
  199. break
  200. buffer += chr(ch)
  201. i += 1
  202. finally:
  203. curses.noecho()
  204. return buffer
  205. def revoke_selection(self):
  206. if not self.selected_task:
  207. return curses.beep()
  208. reply = self.app.control.revoke(self.selected_task, reply=True)
  209. self.alert_remote_control_reply(reply)
  210. def selection_info(self):
  211. if not self.selected_task:
  212. return
  213. def alert_callback(mx, my, xs):
  214. my, mx = self.win.getmaxyx()
  215. y = count(xs)
  216. task = self.state.tasks[self.selected_task]
  217. info = task.info(extra=['state'])
  218. infoitems = [('args', info.pop('args', None)),
  219. ('kwargs', info.pop('kwargs', None))
  220. ] + list(info.items())
  221. for key, value in infoitems:
  222. if key is None:
  223. continue
  224. value = str(value)
  225. curline = next(y)
  226. keys = key + ': '
  227. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  228. wrapped = wrap(value, mx - 2)
  229. if len(wrapped) == 1:
  230. self.win.addstr(curline, len(keys) + 3,
  231. abbr(wrapped[0],
  232. self.screen_width - (len(keys) + 3)))
  233. else:
  234. for subline in wrapped:
  235. nexty = next(y)
  236. if nexty >= my - 1:
  237. subline = ' ' * 4 + '[...]'
  238. elif nexty >= my:
  239. break
  240. self.win.addstr(nexty, 3,
  241. abbr(' ' * 4 + subline, self.screen_width - 4),
  242. curses.A_NORMAL)
  243. return self.alert(alert_callback,
  244. 'Task details for {0.selected_task}'.format(self))
  245. def selection_traceback(self):
  246. if not self.selected_task:
  247. return curses.beep()
  248. task = self.state.tasks[self.selected_task]
  249. if task.state not in states.EXCEPTION_STATES:
  250. return curses.beep()
  251. def alert_callback(my, mx, xs):
  252. y = count(xs)
  253. for line in task.traceback.split('\n'):
  254. self.win.addstr(next(y), 3, line)
  255. return self.alert(alert_callback,
  256. 'Task Exception Traceback for {0.selected_task}'.format(self))
  257. def selection_result(self):
  258. if not self.selected_task:
  259. return
  260. def alert_callback(my, mx, xs):
  261. y = count(xs)
  262. task = self.state.tasks[self.selected_task]
  263. result = getattr(task, 'result', None) or getattr(task,
  264. 'exception', None)
  265. for line in wrap(result, mx - 2):
  266. self.win.addstr(next(y), 3, line)
  267. return self.alert(alert_callback,
  268. 'Task Result for {0.selected_task}'.format(self))
  269. def display_task_row(self, lineno, task):
  270. state_color = self.state_colors.get(task.state)
  271. attr = curses.A_NORMAL
  272. if task.uuid == self.selected_task:
  273. attr = curses.A_STANDOUT
  274. timestamp = datetime.utcfromtimestamp(
  275. task.timestamp or time.time())
  276. timef = timestamp.strftime('%H:%M:%S')
  277. hostname = task.worker.hostname if task.worker else '*NONE*'
  278. line = self.format_row(task.uuid, task.name,
  279. hostname,
  280. timef, task.state)
  281. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  282. if state_color:
  283. self.win.addstr(lineno,
  284. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  285. task.state, state_color | attr)
  286. def draw(self):
  287. win = self.win
  288. self.handle_keypress()
  289. x = LEFT_BORDER_OFFSET
  290. y = blank_line = count(2)
  291. my, mx = win.getmaxyx()
  292. win.erase()
  293. win.bkgd(' ', curses.color_pair(1))
  294. win.border()
  295. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  296. next(blank_line)
  297. win.addstr(next(y), x, self.format_row('UUID', 'TASK',
  298. 'WORKER', 'TIME', 'STATE'),
  299. curses.A_BOLD | curses.A_UNDERLINE)
  300. tasks = self.tasks
  301. if tasks:
  302. for row, (uuid, task) in enumerate(tasks):
  303. if row > self.display_height:
  304. break
  305. if task.uuid:
  306. lineno = next(y)
  307. self.display_task_row(lineno, task)
  308. # -- Footer
  309. next(blank_line)
  310. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  311. # Selected Task Info
  312. if self.selected_task:
  313. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  314. info = 'Missing extended info'
  315. detail = ''
  316. try:
  317. selection = self.state.tasks[self.selected_task]
  318. except KeyError:
  319. pass
  320. else:
  321. info = selection.info()
  322. if 'runtime' in info:
  323. info['runtime'] = '{0:.2fs}'.format(info['runtime'])
  324. if 'result' in info:
  325. info['result'] = abbr(info['result'], 16)
  326. info = ' '.join('{0}={1}'.format(key, value)
  327. for key, value in items(info))
  328. detail = '... -> key i'
  329. infowin = abbr(info,
  330. self.screen_width - len(self.selected_str) - 2,
  331. detail)
  332. win.addstr(my - 5, x + len(self.selected_str), infowin)
  333. # Make ellipsis bold
  334. if detail in infowin:
  335. detailpos = len(infowin) - len(detail)
  336. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  337. detail, curses.A_BOLD)
  338. else:
  339. win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL)
  340. # Workers
  341. if self.workers:
  342. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  343. win.addstr(my - 4, x + len(self.online_str),
  344. ', '.join(sorted(self.workers)), curses.A_NORMAL)
  345. else:
  346. win.addstr(my - 4, x, 'No workers discovered.')
  347. # Info
  348. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  349. win.addstr(my - 3, x + len(self.info_str),
  350. STATUS_SCREEN.format(s=self.state,
  351. w_alive=len([w for w in values(self.state.workers)
  352. if w.alive]),
  353. w_all=len(self.state.workers)),
  354. curses.A_DIM)
  355. # Help
  356. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  357. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  358. curses.A_DIM)
  359. win.refresh()
  360. def safe_add_str(self, y, x, string, *args, **kwargs):
  361. if x + len(string) > self.screen_width:
  362. string = string[:self.screen_width - x]
  363. self.win.addstr(y, x, string, *args, **kwargs)
  364. def init_screen(self):
  365. self.win = curses.initscr()
  366. self.win.nodelay(True)
  367. self.win.keypad(True)
  368. curses.start_color()
  369. curses.init_pair(1, self.foreground, self.background)
  370. # exception states
  371. curses.init_pair(2, curses.COLOR_RED, self.background)
  372. # successful state
  373. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  374. # revoked state
  375. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  376. # greeting
  377. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  378. # started state
  379. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  380. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  381. states.REVOKED: curses.color_pair(4),
  382. states.STARTED: curses.color_pair(6)}
  383. for state in states.EXCEPTION_STATES:
  384. self.state_colors[state] = curses.color_pair(2)
  385. curses.cbreak()
  386. def resetscreen(self):
  387. curses.nocbreak()
  388. self.win.keypad(False)
  389. curses.echo()
  390. curses.endwin()
  391. def nap(self):
  392. curses.napms(self.screen_delay)
  393. @property
  394. def tasks(self):
  395. return list(self.state.tasks_by_time(limit=self.limit))
  396. @property
  397. def workers(self):
  398. return [hostname
  399. for hostname, w in items(self.state.workers)
  400. if w.alive]
  401. class DisplayThread(threading.Thread): # pragma: no cover
  402. def __init__(self, display):
  403. self.display = display
  404. self.shutdown = False
  405. threading.Thread.__init__(self)
  406. def run(self):
  407. while not self.shutdown:
  408. self.display.draw()
  409. self.display.nap()
  410. def capture_events(app, state, display): # pragma: no cover
  411. def on_connection_error(exc, interval):
  412. print('Connection Error: {0!r}. Retry in {1}s.'.format(
  413. exc, interval), file=sys.stderr)
  414. while 1:
  415. print('-> evtop: starting capture...', file=sys.stderr)
  416. with app.connection() as conn:
  417. try:
  418. conn.ensure_connection(on_connection_error,
  419. app.conf.BROKER_CONNECTION_MAX_RETRIES)
  420. recv = app.events.Receiver(conn, handlers={'*': state.event})
  421. display.resetscreen()
  422. display.init_screen()
  423. recv.capture()
  424. except conn.connection_errors + conn.channel_errors as exc:
  425. print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
  426. def evtop(app=None): # pragma: no cover
  427. app = app_or_default(app)
  428. state = app.events.State()
  429. display = CursesMonitor(state, app=app)
  430. display.init_screen()
  431. refresher = DisplayThread(display)
  432. refresher.start()
  433. try:
  434. capture_events(app, state, display)
  435. except Exception:
  436. refresher.shutdown = True
  437. refresher.join()
  438. display.resetscreen()
  439. raise
  440. except (KeyboardInterrupt, SystemExit):
  441. refresher.shutdown = True
  442. refresher.join()
  443. display.resetscreen()
  444. if __name__ == '__main__': # pragma: no cover
  445. evtop()