cursesmon.py 17 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.cursesmon
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Graphical monitor of Celery events using curses.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import curses
  9. import sys
  10. import threading
  11. import time
  12. from datetime import datetime
  13. from itertools import count
  14. from textwrap import wrap
  15. from math import ceil
  16. from celery import VERSION_BANNER
  17. from celery import states
  18. from celery.app import app_or_default
  19. from celery.five import items, values
  20. from celery.utils.text import abbr, abbrtask
  21. BORDER_SPACING = 4
  22. LEFT_BORDER_OFFSET = 3
  23. UUID_WIDTH = 36
  24. STATE_WIDTH = 8
  25. TIMESTAMP_WIDTH = 8
  26. MIN_WORKER_WIDTH = 15
  27. MIN_TASK_WIDTH = 16
  28. # this module is considered experimental
  29. # we don't care about coverage.
  30. STATUS_SCREEN = """\
  31. events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}
  32. """
  33. class CursesMonitor(object): # pragma: no cover
  34. keymap = {}
  35. win = None
  36. screen_width = None
  37. screen_delay = 10
  38. selected_task = None
  39. selected_position = 0
  40. selected_str = 'Selected: '
  41. foreground = curses.COLOR_BLACK
  42. background = curses.COLOR_WHITE
  43. online_str = 'Workers online: '
  44. help_title = 'Keys: '
  45. help = ('j:down k:up i:info t:traceback r:result c:revoke ^c: quit')
  46. greet = 'celery events {0}'.format(VERSION_BANNER)
  47. info_str = 'Info: '
  48. def __init__(self, state, keymap=None, app=None):
  49. self.app = app_or_default(app)
  50. self.keymap = keymap or self.keymap
  51. self.state = state
  52. default_keymap = {'J': self.move_selection_down,
  53. 'K': self.move_selection_up,
  54. 'C': self.revoke_selection,
  55. 'T': self.selection_traceback,
  56. 'R': self.selection_result,
  57. 'I': self.selection_info,
  58. 'L': self.selection_rate_limit}
  59. self.keymap = dict(default_keymap, **self.keymap)
  60. def format_row(self, uuid, task, worker, timestamp, state):
  61. mx = self.display_width
  62. # include spacing
  63. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  64. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  65. if uuid_space < UUID_WIDTH:
  66. uuid_width = uuid_space
  67. else:
  68. uuid_width = UUID_WIDTH
  69. detail_width = detail_width - uuid_width - 1
  70. task_width = int(ceil(detail_width / 2.0))
  71. worker_width = detail_width - task_width - 1
  72. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  73. worker = abbr(worker, worker_width).ljust(worker_width)
  74. task = abbrtask(task, task_width).ljust(task_width)
  75. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  76. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  77. row = '{0} {1} {2} {3} {4} '.format(uuid, worker, task,
  78. timestamp, state)
  79. if self.screen_width is None:
  80. self.screen_width = len(row[:mx])
  81. return row[:mx]
  82. @property
  83. def screen_width(self):
  84. _, mx = self.win.getmaxyx()
  85. return mx
  86. @property
  87. def screen_height(self):
  88. my, _ = self.win.getmaxyx()
  89. return my
  90. @property
  91. def display_width(self):
  92. _, mx = self.win.getmaxyx()
  93. return mx - BORDER_SPACING
  94. @property
  95. def display_height(self):
  96. my, _ = self.win.getmaxyx()
  97. return my - 10
  98. @property
  99. def limit(self):
  100. return self.display_height
  101. def find_position(self):
  102. if not self.tasks:
  103. return 0
  104. for i, e in enumerate(self.tasks):
  105. if self.selected_task == e[0]:
  106. return i
  107. return 0
  108. def move_selection_up(self):
  109. self.move_selection(-1)
  110. def move_selection_down(self):
  111. self.move_selection(1)
  112. def move_selection(self, direction=1):
  113. if not self.tasks:
  114. return
  115. pos = self.find_position()
  116. try:
  117. self.selected_task = self.tasks[pos + direction][0]
  118. except IndexError:
  119. self.selected_task = self.tasks[0][0]
  120. keyalias = {curses.KEY_DOWN: 'J',
  121. curses.KEY_UP: 'K',
  122. curses.KEY_ENTER: 'I'}
  123. def handle_keypress(self):
  124. try:
  125. key = self.win.getkey().upper()
  126. except:
  127. return
  128. key = self.keyalias.get(key) or key
  129. handler = self.keymap.get(key)
  130. if handler is not None:
  131. handler()
  132. def alert(self, callback, title=None):
  133. self.win.erase()
  134. my, mx = self.win.getmaxyx()
  135. y = blank_line = count(2)
  136. if title:
  137. self.win.addstr(next(y), 3, title,
  138. curses.A_BOLD | curses.A_UNDERLINE)
  139. next(blank_line)
  140. callback(my, mx, next(y))
  141. self.win.addstr(my - 1, 0, 'Press any key to continue...',
  142. curses.A_BOLD)
  143. self.win.refresh()
  144. while 1:
  145. try:
  146. return self.win.getkey().upper()
  147. except:
  148. pass
  149. def selection_rate_limit(self):
  150. if not self.selected_task:
  151. return curses.beep()
  152. task = self.state.tasks[self.selected_task]
  153. if not task.name:
  154. return curses.beep()
  155. my, mx = self.win.getmaxyx()
  156. r = 'New rate limit: '
  157. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  158. self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))
  159. rlimit = self.readline(my - 2, 3 + len(r))
  160. if rlimit:
  161. reply = self.app.control.rate_limit(task.name,
  162. rlimit.strip(), reply=True)
  163. self.alert_remote_control_reply(reply)
  164. def alert_remote_control_reply(self, reply):
  165. def callback(my, mx, xs):
  166. y = count(xs)
  167. if not reply:
  168. self.win.addstr(
  169. next(y), 3, 'No replies received in 1s deadline.',
  170. curses.A_BOLD + curses.color_pair(2),
  171. )
  172. return
  173. for subreply in reply:
  174. curline = next(y)
  175. host, response = next(items(subreply))
  176. host = '{0}: '.format(host)
  177. self.win.addstr(curline, 3, host, curses.A_BOLD)
  178. attr = curses.A_NORMAL
  179. text = ''
  180. if 'error' in response:
  181. text = response['error']
  182. attr |= curses.color_pair(2)
  183. elif 'ok' in response:
  184. text = response['ok']
  185. attr |= curses.color_pair(3)
  186. self.win.addstr(curline, 3 + len(host), text, attr)
  187. return self.alert(callback, 'Remote Control Command Replies')
  188. def readline(self, x, y):
  189. buffer = str()
  190. curses.echo()
  191. try:
  192. i = 0
  193. while 1:
  194. ch = self.win.getch(x, y + i)
  195. if ch != -1:
  196. if ch in (10, curses.KEY_ENTER): # enter
  197. break
  198. if ch in (27, ):
  199. buffer = str()
  200. break
  201. buffer += chr(ch)
  202. i += 1
  203. finally:
  204. curses.noecho()
  205. return buffer
  206. def revoke_selection(self):
  207. if not self.selected_task:
  208. return curses.beep()
  209. reply = self.app.control.revoke(self.selected_task, reply=True)
  210. self.alert_remote_control_reply(reply)
  211. def selection_info(self):
  212. if not self.selected_task:
  213. return
  214. def alert_callback(mx, my, xs):
  215. my, mx = self.win.getmaxyx()
  216. y = count(xs)
  217. task = self.state.tasks[self.selected_task]
  218. info = task.info(extra=['state'])
  219. infoitems = [
  220. ('args', info.pop('args', None)),
  221. ('kwargs', info.pop('kwargs', None))
  222. ] + list(info.items())
  223. for key, value in infoitems:
  224. if key is None:
  225. continue
  226. value = str(value)
  227. curline = next(y)
  228. keys = key + ': '
  229. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  230. wrapped = wrap(value, mx - 2)
  231. if len(wrapped) == 1:
  232. self.win.addstr(
  233. curline, len(keys) + 3,
  234. abbr(wrapped[0],
  235. self.screen_width - (len(keys) + 3)))
  236. else:
  237. for subline in wrapped:
  238. nexty = next(y)
  239. if nexty >= my - 1:
  240. subline = ' ' * 4 + '[...]'
  241. elif nexty >= my:
  242. break
  243. self.win.addstr(
  244. nexty, 3,
  245. abbr(' ' * 4 + subline, self.screen_width - 4),
  246. curses.A_NORMAL,
  247. )
  248. return self.alert(
  249. alert_callback, 'Task details for {0.selected_task}'.format(self),
  250. )
  251. def selection_traceback(self):
  252. if not self.selected_task:
  253. return curses.beep()
  254. task = self.state.tasks[self.selected_task]
  255. if task.state not in states.EXCEPTION_STATES:
  256. return curses.beep()
  257. def alert_callback(my, mx, xs):
  258. y = count(xs)
  259. for line in task.traceback.split('\n'):
  260. self.win.addstr(next(y), 3, line)
  261. return self.alert(
  262. alert_callback,
  263. 'Task Exception Traceback for {0.selected_task}'.format(self),
  264. )
  265. def selection_result(self):
  266. if not self.selected_task:
  267. return
  268. def alert_callback(my, mx, xs):
  269. y = count(xs)
  270. task = self.state.tasks[self.selected_task]
  271. result = (getattr(task, 'result', None)
  272. or getattr(task, 'exception', None))
  273. for line in wrap(result, mx - 2):
  274. self.win.addstr(next(y), 3, line)
  275. return self.alert(
  276. alert_callback,
  277. 'Task Result for {0.selected_task}'.format(self),
  278. )
  279. def display_task_row(self, lineno, task):
  280. state_color = self.state_colors.get(task.state)
  281. attr = curses.A_NORMAL
  282. if task.uuid == self.selected_task:
  283. attr = curses.A_STANDOUT
  284. timestamp = datetime.utcfromtimestamp(
  285. task.timestamp or time.time(),
  286. )
  287. timef = timestamp.strftime('%H:%M:%S')
  288. hostname = task.worker.hostname if task.worker else '*NONE*'
  289. line = self.format_row(task.uuid, task.name,
  290. hostname,
  291. timef, task.state)
  292. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  293. if state_color:
  294. self.win.addstr(lineno,
  295. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  296. task.state, state_color | attr)
  297. def draw(self):
  298. win = self.win
  299. self.handle_keypress()
  300. x = LEFT_BORDER_OFFSET
  301. y = blank_line = count(2)
  302. my, mx = win.getmaxyx()
  303. win.erase()
  304. win.bkgd(' ', curses.color_pair(1))
  305. win.border()
  306. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  307. next(blank_line)
  308. win.addstr(next(y), x, self.format_row('UUID', 'TASK',
  309. 'WORKER', 'TIME', 'STATE'),
  310. curses.A_BOLD | curses.A_UNDERLINE)
  311. tasks = self.tasks
  312. if tasks:
  313. for row, (uuid, task) in enumerate(tasks):
  314. if row > self.display_height:
  315. break
  316. if task.uuid:
  317. lineno = next(y)
  318. self.display_task_row(lineno, task)
  319. # -- Footer
  320. next(blank_line)
  321. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  322. # Selected Task Info
  323. if self.selected_task:
  324. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  325. info = 'Missing extended info'
  326. detail = ''
  327. try:
  328. selection = self.state.tasks[self.selected_task]
  329. except KeyError:
  330. pass
  331. else:
  332. info = selection.info()
  333. if 'runtime' in info:
  334. info['runtime'] = '{0:.2f}'.format(info['runtime'])
  335. if 'result' in info:
  336. info['result'] = abbr(info['result'], 16)
  337. info = ' '.join(
  338. '{0}={1}'.format(key, value) for key, value in items(info)
  339. )
  340. detail = '... -> key i'
  341. infowin = abbr(info,
  342. self.screen_width - len(self.selected_str) - 2,
  343. detail)
  344. win.addstr(my - 5, x + len(self.selected_str), infowin)
  345. # Make ellipsis bold
  346. if detail in infowin:
  347. detailpos = len(infowin) - len(detail)
  348. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  349. detail, curses.A_BOLD)
  350. else:
  351. win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL)
  352. # Workers
  353. if self.workers:
  354. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  355. win.addstr(my - 4, x + len(self.online_str),
  356. ', '.join(sorted(self.workers)), curses.A_NORMAL)
  357. else:
  358. win.addstr(my - 4, x, 'No workers discovered.')
  359. # Info
  360. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  361. win.addstr(
  362. my - 3, x + len(self.info_str),
  363. STATUS_SCREEN.format(
  364. s=self.state,
  365. w_alive=len([w for w in values(self.state.workers)
  366. if w.alive]),
  367. w_all=len(self.state.workers),
  368. ),
  369. curses.A_DIM,
  370. )
  371. # Help
  372. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  373. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  374. curses.A_DIM)
  375. win.refresh()
  376. def safe_add_str(self, y, x, string, *args, **kwargs):
  377. if x + len(string) > self.screen_width:
  378. string = string[:self.screen_width - x]
  379. self.win.addstr(y, x, string, *args, **kwargs)
  380. def init_screen(self):
  381. self.win = curses.initscr()
  382. self.win.nodelay(True)
  383. self.win.keypad(True)
  384. curses.start_color()
  385. curses.init_pair(1, self.foreground, self.background)
  386. # exception states
  387. curses.init_pair(2, curses.COLOR_RED, self.background)
  388. # successful state
  389. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  390. # revoked state
  391. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  392. # greeting
  393. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  394. # started state
  395. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  396. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  397. states.REVOKED: curses.color_pair(4),
  398. states.STARTED: curses.color_pair(6)}
  399. for state in states.EXCEPTION_STATES:
  400. self.state_colors[state] = curses.color_pair(2)
  401. curses.cbreak()
  402. def resetscreen(self):
  403. curses.nocbreak()
  404. self.win.keypad(False)
  405. curses.echo()
  406. curses.endwin()
  407. def nap(self):
  408. curses.napms(self.screen_delay)
  409. @property
  410. def tasks(self):
  411. return list(self.state.tasks_by_time(limit=self.limit))
  412. @property
  413. def workers(self):
  414. return [hostname for hostname, w in items(self.state.workers)
  415. if w.alive]
  416. class DisplayThread(threading.Thread): # pragma: no cover
  417. def __init__(self, display):
  418. self.display = display
  419. self.shutdown = False
  420. threading.Thread.__init__(self)
  421. def run(self):
  422. while not self.shutdown:
  423. self.display.draw()
  424. self.display.nap()
  425. def capture_events(app, state, display): # pragma: no cover
  426. def on_connection_error(exc, interval):
  427. print('Connection Error: {0!r}. Retry in {1}s.'.format(
  428. exc, interval), file=sys.stderr)
  429. while 1:
  430. print('-> evtop: starting capture...', file=sys.stderr)
  431. with app.connection() as conn:
  432. try:
  433. conn.ensure_connection(on_connection_error,
  434. app.conf.BROKER_CONNECTION_MAX_RETRIES)
  435. recv = app.events.Receiver(conn, handlers={'*': state.event})
  436. display.resetscreen()
  437. display.init_screen()
  438. recv.capture()
  439. except conn.connection_errors + conn.channel_errors as exc:
  440. print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
  441. def evtop(app=None): # pragma: no cover
  442. app = app_or_default(app)
  443. state = app.events.State()
  444. display = CursesMonitor(state, app=app)
  445. display.init_screen()
  446. refresher = DisplayThread(display)
  447. refresher.start()
  448. try:
  449. capture_events(app, state, display)
  450. except Exception:
  451. refresher.shutdown = True
  452. refresher.join()
  453. display.resetscreen()
  454. raise
  455. except (KeyboardInterrupt, SystemExit):
  456. refresher.shutdown = True
  457. refresher.join()
  458. display.resetscreen()
  459. if __name__ == '__main__': # pragma: no cover
  460. evtop()