cursesmon.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.cursesmon
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Graphical monitor of Celery events using curses.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import curses
  9. import sys
  10. import threading
  11. import time
  12. from datetime import datetime
  13. from itertools import count
  14. from textwrap import wrap
  15. from math import ceil
  16. from celery import VERSION_BANNER
  17. from celery import states
  18. from celery.app import app_or_default
  19. from celery.five import items, values
  20. from celery.utils.text import abbr, abbrtask
  21. BORDER_SPACING = 4
  22. LEFT_BORDER_OFFSET = 3
  23. UUID_WIDTH = 36
  24. STATE_WIDTH = 8
  25. TIMESTAMP_WIDTH = 8
  26. MIN_WORKER_WIDTH = 15
  27. MIN_TASK_WIDTH = 16
  28. # this module is considered experimental
  29. # we don't care about coverage.
  30. STATUS_SCREEN = """\
  31. events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}
  32. """
  33. class CursesMonitor(object): # pragma: no cover
  34. keymap = {}
  35. win = None
  36. screen_width = None
  37. screen_delay = 10
  38. selected_task = None
  39. selected_position = 0
  40. selected_str = 'Selected: '
  41. foreground = curses.COLOR_BLACK
  42. background = curses.COLOR_WHITE
  43. online_str = 'Workers online: '
  44. help_title = 'Keys: '
  45. help = ('j:down k:up i:info t:traceback r:result c:revoke ^c: quit')
  46. greet = 'celery events {0}'.format(VERSION_BANNER)
  47. info_str = 'Info: '
  48. def __init__(self, state, keymap=None, app=None):
  49. self.app = app_or_default(app)
  50. self.keymap = keymap or self.keymap
  51. self.state = state
  52. default_keymap = {'J': self.move_selection_down,
  53. 'K': self.move_selection_up,
  54. 'C': self.revoke_selection,
  55. 'T': self.selection_traceback,
  56. 'R': self.selection_result,
  57. 'I': self.selection_info,
  58. 'L': self.selection_rate_limit}
  59. self.keymap = dict(default_keymap, **self.keymap)
  60. self.lock = threading.RLock()
  61. def format_row(self, uuid, task, worker, timestamp, state):
  62. mx = self.display_width
  63. # include spacing
  64. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  65. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  66. if uuid_space < UUID_WIDTH:
  67. uuid_width = uuid_space
  68. else:
  69. uuid_width = UUID_WIDTH
  70. detail_width = detail_width - uuid_width - 1
  71. task_width = int(ceil(detail_width / 2.0))
  72. worker_width = detail_width - task_width - 1
  73. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  74. worker = abbr(worker, worker_width).ljust(worker_width)
  75. task = abbrtask(task, task_width).ljust(task_width)
  76. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  77. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  78. row = '{0} {1} {2} {3} {4} '.format(uuid, worker, task,
  79. timestamp, state)
  80. if self.screen_width is None:
  81. self.screen_width = len(row[:mx])
  82. return row[:mx]
  83. @property
  84. def screen_width(self):
  85. _, mx = self.win.getmaxyx()
  86. return mx
  87. @property
  88. def screen_height(self):
  89. my, _ = self.win.getmaxyx()
  90. return my
  91. @property
  92. def display_width(self):
  93. _, mx = self.win.getmaxyx()
  94. return mx - BORDER_SPACING
  95. @property
  96. def display_height(self):
  97. my, _ = self.win.getmaxyx()
  98. return my - 10
  99. @property
  100. def limit(self):
  101. return self.display_height
  102. def find_position(self):
  103. if not self.tasks:
  104. return 0
  105. for i, e in enumerate(self.tasks):
  106. if self.selected_task == e[0]:
  107. return i
  108. return 0
  109. def move_selection_up(self):
  110. self.move_selection(-1)
  111. def move_selection_down(self):
  112. self.move_selection(1)
  113. def move_selection(self, direction=1):
  114. if not self.tasks:
  115. return
  116. pos = self.find_position()
  117. try:
  118. self.selected_task = self.tasks[pos + direction][0]
  119. except IndexError:
  120. self.selected_task = self.tasks[0][0]
  121. keyalias = {curses.KEY_DOWN: 'J',
  122. curses.KEY_UP: 'K',
  123. curses.KEY_ENTER: 'I'}
  124. def handle_keypress(self):
  125. try:
  126. key = self.win.getkey().upper()
  127. except:
  128. return
  129. key = self.keyalias.get(key) or key
  130. handler = self.keymap.get(key)
  131. if handler is not None:
  132. handler()
  133. def alert(self, callback, title=None):
  134. self.win.erase()
  135. my, mx = self.win.getmaxyx()
  136. y = blank_line = count(2)
  137. if title:
  138. self.win.addstr(next(y), 3, title,
  139. curses.A_BOLD | curses.A_UNDERLINE)
  140. next(blank_line)
  141. callback(my, mx, next(y))
  142. self.win.addstr(my - 1, 0, 'Press any key to continue...',
  143. curses.A_BOLD)
  144. self.win.refresh()
  145. while 1:
  146. try:
  147. return self.win.getkey().upper()
  148. except:
  149. pass
  150. def selection_rate_limit(self):
  151. if not self.selected_task:
  152. return curses.beep()
  153. task = self.state.tasks[self.selected_task]
  154. if not task.name:
  155. return curses.beep()
  156. my, mx = self.win.getmaxyx()
  157. r = 'New rate limit: '
  158. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  159. self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))
  160. rlimit = self.readline(my - 2, 3 + len(r))
  161. if rlimit:
  162. reply = self.app.control.rate_limit(task.name,
  163. rlimit.strip(), reply=True)
  164. self.alert_remote_control_reply(reply)
  165. def alert_remote_control_reply(self, reply):
  166. def callback(my, mx, xs):
  167. y = count(xs)
  168. if not reply:
  169. self.win.addstr(
  170. next(y), 3, 'No replies received in 1s deadline.',
  171. curses.A_BOLD + curses.color_pair(2),
  172. )
  173. return
  174. for subreply in reply:
  175. curline = next(y)
  176. host, response = next(items(subreply))
  177. host = '{0}: '.format(host)
  178. self.win.addstr(curline, 3, host, curses.A_BOLD)
  179. attr = curses.A_NORMAL
  180. text = ''
  181. if 'error' in response:
  182. text = response['error']
  183. attr |= curses.color_pair(2)
  184. elif 'ok' in response:
  185. text = response['ok']
  186. attr |= curses.color_pair(3)
  187. self.win.addstr(curline, 3 + len(host), text, attr)
  188. return self.alert(callback, 'Remote Control Command Replies')
  189. def readline(self, x, y):
  190. buffer = str()
  191. curses.echo()
  192. try:
  193. i = 0
  194. while 1:
  195. ch = self.win.getch(x, y + i)
  196. if ch != -1:
  197. if ch in (10, curses.KEY_ENTER): # enter
  198. break
  199. if ch in (27, ):
  200. buffer = str()
  201. break
  202. buffer += chr(ch)
  203. i += 1
  204. finally:
  205. curses.noecho()
  206. return buffer
  207. def revoke_selection(self):
  208. if not self.selected_task:
  209. return curses.beep()
  210. reply = self.app.control.revoke(self.selected_task, reply=True)
  211. self.alert_remote_control_reply(reply)
  212. def selection_info(self):
  213. if not self.selected_task:
  214. return
  215. def alert_callback(mx, my, xs):
  216. my, mx = self.win.getmaxyx()
  217. y = count(xs)
  218. task = self.state.tasks[self.selected_task]
  219. info = task.info(extra=['state'])
  220. infoitems = [
  221. ('args', info.pop('args', None)),
  222. ('kwargs', info.pop('kwargs', None))
  223. ] + list(info.items())
  224. for key, value in infoitems:
  225. if key is None:
  226. continue
  227. value = str(value)
  228. curline = next(y)
  229. keys = key + ': '
  230. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  231. wrapped = wrap(value, mx - 2)
  232. if len(wrapped) == 1:
  233. self.win.addstr(
  234. curline, len(keys) + 3,
  235. abbr(wrapped[0],
  236. self.screen_width - (len(keys) + 3)))
  237. else:
  238. for subline in wrapped:
  239. nexty = next(y)
  240. if nexty >= my - 1:
  241. subline = ' ' * 4 + '[...]'
  242. elif nexty >= my:
  243. break
  244. self.win.addstr(
  245. nexty, 3,
  246. abbr(' ' * 4 + subline, self.screen_width - 4),
  247. curses.A_NORMAL,
  248. )
  249. return self.alert(
  250. alert_callback, 'Task details for {0.selected_task}'.format(self),
  251. )
  252. def selection_traceback(self):
  253. if not self.selected_task:
  254. return curses.beep()
  255. task = self.state.tasks[self.selected_task]
  256. if task.state not in states.EXCEPTION_STATES:
  257. return curses.beep()
  258. def alert_callback(my, mx, xs):
  259. y = count(xs)
  260. for line in task.traceback.split('\n'):
  261. self.win.addstr(next(y), 3, line)
  262. return self.alert(
  263. alert_callback,
  264. 'Task Exception Traceback for {0.selected_task}'.format(self),
  265. )
  266. def selection_result(self):
  267. if not self.selected_task:
  268. return
  269. def alert_callback(my, mx, xs):
  270. y = count(xs)
  271. task = self.state.tasks[self.selected_task]
  272. result = (getattr(task, 'result', None)
  273. or getattr(task, 'exception', None))
  274. for line in wrap(result, mx - 2):
  275. self.win.addstr(next(y), 3, line)
  276. return self.alert(
  277. alert_callback,
  278. 'Task Result for {0.selected_task}'.format(self),
  279. )
  280. def display_task_row(self, lineno, task):
  281. state_color = self.state_colors.get(task.state)
  282. attr = curses.A_NORMAL
  283. if task.uuid == self.selected_task:
  284. attr = curses.A_STANDOUT
  285. timestamp = datetime.utcfromtimestamp(
  286. task.timestamp or time.time(),
  287. )
  288. timef = timestamp.strftime('%H:%M:%S')
  289. hostname = task.worker.hostname if task.worker else '*NONE*'
  290. line = self.format_row(task.uuid, task.name,
  291. hostname,
  292. timef, task.state)
  293. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  294. if state_color:
  295. self.win.addstr(lineno,
  296. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  297. task.state, state_color | attr)
  298. def draw(self):
  299. with self.lock:
  300. win = self.win
  301. self.handle_keypress()
  302. x = LEFT_BORDER_OFFSET
  303. y = blank_line = count(2)
  304. my, mx = win.getmaxyx()
  305. win.erase()
  306. win.bkgd(' ', curses.color_pair(1))
  307. win.border()
  308. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  309. next(blank_line)
  310. win.addstr(next(y), x, self.format_row('UUID', 'TASK',
  311. 'WORKER', 'TIME', 'STATE'),
  312. curses.A_BOLD | curses.A_UNDERLINE)
  313. tasks = self.tasks
  314. if tasks:
  315. for row, (uuid, task) in enumerate(tasks):
  316. if row > self.display_height:
  317. break
  318. if task.uuid:
  319. lineno = next(y)
  320. self.display_task_row(lineno, task)
  321. # -- Footer
  322. next(blank_line)
  323. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  324. # Selected Task Info
  325. if self.selected_task:
  326. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  327. info = 'Missing extended info'
  328. detail = ''
  329. try:
  330. selection = self.state.tasks[self.selected_task]
  331. except KeyError:
  332. pass
  333. else:
  334. info = selection.info()
  335. if 'runtime' in info:
  336. info['runtime'] = '{0:.2f}'.format(info['runtime'])
  337. if 'result' in info:
  338. info['result'] = abbr(info['result'], 16)
  339. info = ' '.join(
  340. '{0}={1}'.format(key, value)
  341. for key, value in items(info)
  342. )
  343. detail = '... -> key i'
  344. infowin = abbr(info,
  345. self.screen_width - len(self.selected_str) - 2,
  346. detail)
  347. win.addstr(my - 5, x + len(self.selected_str), infowin)
  348. # Make ellipsis bold
  349. if detail in infowin:
  350. detailpos = len(infowin) - len(detail)
  351. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  352. detail, curses.A_BOLD)
  353. else:
  354. win.addstr(my - 5, x, 'No task selected', curses.A_NORMAL)
  355. # Workers
  356. if self.workers:
  357. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  358. win.addstr(my - 4, x + len(self.online_str),
  359. ', '.join(sorted(self.workers)), curses.A_NORMAL)
  360. else:
  361. win.addstr(my - 4, x, 'No workers discovered.')
  362. # Info
  363. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  364. win.addstr(
  365. my - 3, x + len(self.info_str),
  366. STATUS_SCREEN.format(
  367. s=self.state,
  368. w_alive=len([w for w in values(self.state.workers)
  369. if w.alive]),
  370. w_all=len(self.state.workers),
  371. ),
  372. curses.A_DIM,
  373. )
  374. # Help
  375. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  376. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  377. curses.A_DIM)
  378. win.refresh()
  379. def safe_add_str(self, y, x, string, *args, **kwargs):
  380. if x + len(string) > self.screen_width:
  381. string = string[:self.screen_width - x]
  382. self.win.addstr(y, x, string, *args, **kwargs)
  383. def init_screen(self):
  384. with self.lock:
  385. self.win = curses.initscr()
  386. self.win.nodelay(True)
  387. self.win.keypad(True)
  388. curses.start_color()
  389. curses.init_pair(1, self.foreground, self.background)
  390. # exception states
  391. curses.init_pair(2, curses.COLOR_RED, self.background)
  392. # successful state
  393. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  394. # revoked state
  395. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  396. # greeting
  397. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  398. # started state
  399. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  400. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  401. states.REVOKED: curses.color_pair(4),
  402. states.STARTED: curses.color_pair(6)}
  403. for state in states.EXCEPTION_STATES:
  404. self.state_colors[state] = curses.color_pair(2)
  405. curses.cbreak()
  406. def resetscreen(self):
  407. with self.lock:
  408. curses.nocbreak()
  409. self.win.keypad(False)
  410. curses.echo()
  411. curses.endwin()
  412. def nap(self):
  413. curses.napms(self.screen_delay)
  414. @property
  415. def tasks(self):
  416. return list(self.state.tasks_by_time(limit=self.limit))
  417. @property
  418. def workers(self):
  419. return [hostname for hostname, w in items(self.state.workers)
  420. if w.alive]
  421. class DisplayThread(threading.Thread): # pragma: no cover
  422. def __init__(self, display):
  423. self.display = display
  424. self.shutdown = False
  425. threading.Thread.__init__(self)
  426. def run(self):
  427. while not self.shutdown:
  428. self.display.draw()
  429. self.display.nap()
  430. def capture_events(app, state, display): # pragma: no cover
  431. def on_connection_error(exc, interval):
  432. print('Connection Error: {0!r}. Retry in {1}s.'.format(
  433. exc, interval), file=sys.stderr)
  434. while 1:
  435. print('-> evtop: starting capture...', file=sys.stderr)
  436. with app.connection() as conn:
  437. try:
  438. conn.ensure_connection(on_connection_error,
  439. app.conf.BROKER_CONNECTION_MAX_RETRIES)
  440. recv = app.events.Receiver(conn, handlers={'*': state.event})
  441. display.resetscreen()
  442. display.init_screen()
  443. recv.capture()
  444. except conn.connection_errors + conn.channel_errors as exc:
  445. print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
  446. def evtop(app=None): # pragma: no cover
  447. app = app_or_default(app)
  448. state = app.events.State()
  449. display = CursesMonitor(state, app=app)
  450. display.init_screen()
  451. refresher = DisplayThread(display)
  452. refresher.start()
  453. try:
  454. capture_events(app, state, display)
  455. except Exception:
  456. refresher.shutdown = True
  457. refresher.join()
  458. display.resetscreen()
  459. raise
  460. except (KeyboardInterrupt, SystemExit):
  461. refresher.shutdown = True
  462. refresher.join()
  463. display.resetscreen()
  464. if __name__ == '__main__': # pragma: no cover
  465. evtop()