cursesmon.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.cursesmon
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. Graphical monitor of Celery events using curses.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import curses
  12. import sys
  13. import threading
  14. import time
  15. from datetime import datetime
  16. from itertools import count
  17. from textwrap import wrap
  18. from math import ceil
  19. from celery import __version__
  20. from celery import states
  21. from celery.app import app_or_default
  22. from celery.utils.text import abbr, abbrtask
  23. BORDER_SPACING = 4
  24. LEFT_BORDER_OFFSET = 3
  25. UUID_WIDTH = 36
  26. STATE_WIDTH = 8
  27. TIMESTAMP_WIDTH = 8
  28. MIN_WORKER_WIDTH = 15
  29. MIN_TASK_WIDTH = 16
  30. # this module is considered experimental
  31. # we don't care about coverage.
  32. class CursesMonitor(object): # pragma: no cover
  33. keymap = {}
  34. win = None
  35. screen_width = None
  36. screen_delay = 10
  37. selected_task = None
  38. selected_position = 0
  39. selected_str = "Selected: "
  40. foreground = curses.COLOR_BLACK
  41. background = curses.COLOR_WHITE
  42. online_str = "Workers online: "
  43. help_title = "Keys: "
  44. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  45. greet = "celeryev %s" % __version__
  46. info_str = "Info: "
  47. def __init__(self, state, keymap=None, app=None):
  48. self.app = app_or_default(app)
  49. self.keymap = keymap or self.keymap
  50. self.state = state
  51. default_keymap = {"J": self.move_selection_down,
  52. "K": self.move_selection_up,
  53. "C": self.revoke_selection,
  54. "T": self.selection_traceback,
  55. "R": self.selection_result,
  56. "I": self.selection_info,
  57. "L": self.selection_rate_limit}
  58. self.keymap = dict(default_keymap, **self.keymap)
  59. def format_row(self, uuid, task, worker, timestamp, state):
  60. mx = self.display_width
  61. # include spacing
  62. detail_width = mx - 1 - STATE_WIDTH - 1 - TIMESTAMP_WIDTH
  63. uuid_space = detail_width - 1 - MIN_TASK_WIDTH - 1 - MIN_WORKER_WIDTH
  64. if uuid_space < UUID_WIDTH:
  65. uuid_width = uuid_space
  66. else:
  67. uuid_width = UUID_WIDTH
  68. detail_width = detail_width - uuid_width - 1
  69. task_width = int(ceil(detail_width / 2.0))
  70. worker_width = detail_width - task_width - 1
  71. uuid = abbr(uuid, uuid_width).ljust(uuid_width)
  72. worker = abbr(worker, worker_width).ljust(worker_width)
  73. task = abbrtask(task, task_width).ljust(task_width)
  74. state = abbr(state, STATE_WIDTH).ljust(STATE_WIDTH)
  75. timestamp = timestamp.ljust(TIMESTAMP_WIDTH)
  76. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  77. if self.screen_width is None:
  78. self.screen_width = len(row[:mx])
  79. return row[:mx]
  80. @property
  81. def screen_width(self):
  82. _, mx = self.win.getmaxyx()
  83. return mx
  84. @property
  85. def screen_height(self):
  86. my, _ = self.win.getmaxyx()
  87. return my
  88. @property
  89. def display_width(self):
  90. _, mx = self.win.getmaxyx()
  91. return mx - BORDER_SPACING
  92. @property
  93. def display_height(self):
  94. my, _ = self.win.getmaxyx()
  95. return my - 10
  96. @property
  97. def limit(self):
  98. return self.display_height
  99. def find_position(self):
  100. if not self.tasks:
  101. return 0
  102. for i, e in enumerate(self.tasks):
  103. if self.selected_task == e[0]:
  104. return i
  105. return 0
  106. def move_selection_up(self):
  107. self.move_selection(-1)
  108. def move_selection_down(self):
  109. self.move_selection(1)
  110. def move_selection(self, direction=1):
  111. if not self.tasks:
  112. return
  113. pos = self.find_position()
  114. try:
  115. self.selected_task = self.tasks[pos + direction][0]
  116. except IndexError:
  117. self.selected_task = self.tasks[0][0]
  118. keyalias = {curses.KEY_DOWN: "J",
  119. curses.KEY_UP: "K",
  120. curses.KEY_ENTER: "I"}
  121. def handle_keypress(self):
  122. try:
  123. key = self.win.getkey().upper()
  124. except:
  125. return
  126. key = self.keyalias.get(key) or key
  127. handler = self.keymap.get(key)
  128. if handler is not None:
  129. handler()
  130. def alert(self, callback, title=None):
  131. self.win.erase()
  132. my, mx = self.win.getmaxyx()
  133. y = blank_line = count(2).next
  134. if title:
  135. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  136. blank_line()
  137. callback(my, mx, y())
  138. self.win.addstr(my - 1, 0, "Press any key to continue...",
  139. curses.A_BOLD)
  140. self.win.refresh()
  141. while 1:
  142. try:
  143. return self.win.getkey().upper()
  144. except:
  145. pass
  146. def selection_rate_limit(self):
  147. if not self.selected_task:
  148. return curses.beep()
  149. task = self.state.tasks[self.selected_task]
  150. if not task.name:
  151. return curses.beep()
  152. my, mx = self.win.getmaxyx()
  153. r = "New rate limit: "
  154. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  155. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  156. rlimit = self.readline(my - 2, 3 + len(r))
  157. if rlimit:
  158. reply = self.app.control.rate_limit(task.name,
  159. rlimit.strip(), reply=True)
  160. self.alert_remote_control_reply(reply)
  161. def alert_remote_control_reply(self, reply):
  162. def callback(my, mx, xs):
  163. y = count(xs).next
  164. if not reply:
  165. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  166. curses.A_BOLD + curses.color_pair(2))
  167. return
  168. for subreply in reply:
  169. curline = y()
  170. host, response = subreply.items()[0]
  171. host = "%s: " % host
  172. self.win.addstr(curline, 3, host, curses.A_BOLD)
  173. attr = curses.A_NORMAL
  174. text = ""
  175. if "error" in response:
  176. text = response["error"]
  177. attr |= curses.color_pair(2)
  178. elif "ok" in response:
  179. text = response["ok"]
  180. attr |= curses.color_pair(3)
  181. self.win.addstr(curline, 3 + len(host), text, attr)
  182. return self.alert(callback, "Remote Control Command Replies")
  183. def readline(self, x, y):
  184. buffer = str()
  185. curses.echo()
  186. try:
  187. i = 0
  188. while 1:
  189. ch = self.win.getch(x, y + i)
  190. if ch != -1:
  191. if ch in (10, curses.KEY_ENTER): # enter
  192. break
  193. if ch in (27, ):
  194. buffer = str()
  195. break
  196. buffer += chr(ch)
  197. i += 1
  198. finally:
  199. curses.noecho()
  200. return buffer
  201. def revoke_selection(self):
  202. if not self.selected_task:
  203. return curses.beep()
  204. reply = self.app.control.revoke(self.selected_task, reply=True)
  205. self.alert_remote_control_reply(reply)
  206. def selection_info(self):
  207. if not self.selected_task:
  208. return
  209. def alert_callback(mx, my, xs):
  210. my, mx = self.win.getmaxyx()
  211. y = count(xs).next
  212. task = self.state.tasks[self.selected_task]
  213. info = task.info(extra=["state"])
  214. infoitems = [("args", info.pop("args", None)),
  215. ("kwargs", info.pop("kwargs", None))] + info.items()
  216. for key, value in infoitems:
  217. if key is None:
  218. continue
  219. value = str(value)
  220. curline = y()
  221. keys = key + ": "
  222. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  223. wrapped = wrap(value, mx - 2)
  224. if len(wrapped) == 1:
  225. self.win.addstr(curline, len(keys) + 3,
  226. abbr(wrapped[0],
  227. self.screen_width - (len(keys) + 3)))
  228. else:
  229. for subline in wrapped:
  230. nexty = y()
  231. if nexty >= my - 1:
  232. subline = " " * 4 + "[...]"
  233. elif nexty >= my:
  234. break
  235. self.win.addstr(nexty, 3,
  236. abbr(" " * 4 + subline, self.screen_width - 4),
  237. curses.A_NORMAL)
  238. return self.alert(alert_callback,
  239. "Task details for %s" % self.selected_task)
  240. def selection_traceback(self):
  241. if not self.selected_task:
  242. return curses.beep()
  243. task = self.state.tasks[self.selected_task]
  244. if task.state not in states.EXCEPTION_STATES:
  245. return curses.beep()
  246. def alert_callback(my, mx, xs):
  247. y = count(xs).next
  248. for line in task.traceback.split("\n"):
  249. self.win.addstr(y(), 3, line)
  250. return self.alert(alert_callback,
  251. "Task Exception Traceback for %s" % self.selected_task)
  252. def selection_result(self):
  253. if not self.selected_task:
  254. return
  255. def alert_callback(my, mx, xs):
  256. y = count(xs).next
  257. task = self.state.tasks[self.selected_task]
  258. result = getattr(task, "result", None) or getattr(task,
  259. "exception", None)
  260. for line in wrap(result, mx - 2):
  261. self.win.addstr(y(), 3, line)
  262. return self.alert(alert_callback,
  263. "Task Result for %s" % self.selected_task)
  264. def display_task_row(self, lineno, task):
  265. state_color = self.state_colors.get(task.state)
  266. attr = curses.A_NORMAL
  267. if task.uuid == self.selected_task:
  268. attr = curses.A_STANDOUT
  269. timestamp = datetime.utcfromtimestamp(
  270. task.timestamp or time.time())
  271. timef = timestamp.strftime("%H:%M:%S")
  272. hostname = task.worker.hostname if task.worker else '*NONE*'
  273. line = self.format_row(task.uuid, task.name,
  274. hostname,
  275. timef, task.state)
  276. self.win.addstr(lineno, LEFT_BORDER_OFFSET, line, attr)
  277. if state_color:
  278. self.win.addstr(lineno,
  279. len(line) - STATE_WIDTH + BORDER_SPACING - 1,
  280. task.state, state_color | attr)
  281. def draw(self):
  282. win = self.win
  283. self.handle_keypress()
  284. x = LEFT_BORDER_OFFSET
  285. y = blank_line = count(2).next
  286. my, mx = win.getmaxyx()
  287. win.erase()
  288. win.bkgd(" ", curses.color_pair(1))
  289. win.border()
  290. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  291. blank_line()
  292. win.addstr(y(), x, self.format_row("UUID", "TASK",
  293. "WORKER", "TIME", "STATE"),
  294. curses.A_BOLD | curses.A_UNDERLINE)
  295. tasks = self.tasks
  296. if tasks:
  297. for row, (uuid, task) in enumerate(tasks):
  298. if row > self.display_height:
  299. break
  300. if task.uuid:
  301. lineno = y()
  302. self.display_task_row(lineno, task)
  303. # -- Footer
  304. blank_line()
  305. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width - 4)
  306. # Selected Task Info
  307. if self.selected_task:
  308. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  309. info = "Missing extended info"
  310. detail = ""
  311. try:
  312. selection = self.state.tasks[self.selected_task]
  313. except KeyError:
  314. pass
  315. else:
  316. info = selection.info(["args", "kwargs",
  317. "result", "runtime", "eta"])
  318. if "runtime" in info:
  319. info["runtime"] = "%.2fs" % info["runtime"]
  320. if "result" in info:
  321. info["result"] = abbr(info["result"], 16)
  322. info = " ".join("%s=%s" % (key, value)
  323. for key, value in info.items())
  324. detail = "... -> key i"
  325. infowin = abbr(info,
  326. self.screen_width - len(self.selected_str) - 2,
  327. detail)
  328. win.addstr(my - 5, x + len(self.selected_str), infowin)
  329. # Make ellipsis bold
  330. if detail in infowin:
  331. detailpos = len(infowin) - len(detail)
  332. win.addstr(my - 5, x + len(self.selected_str) + detailpos,
  333. detail, curses.A_BOLD)
  334. else:
  335. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  336. # Workers
  337. if self.workers:
  338. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  339. win.addstr(my - 4, x + len(self.online_str),
  340. ", ".join(sorted(self.workers)), curses.A_NORMAL)
  341. else:
  342. win.addstr(my - 4, x, "No workers discovered.")
  343. # Info
  344. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  345. win.addstr(my - 3, x + len(self.info_str),
  346. "events:%s tasks:%s workers:%s/%s" % (
  347. self.state.event_count, self.state.task_count,
  348. len([w for w in self.state.workers.values()
  349. if w.alive]),
  350. len(self.state.workers)),
  351. curses.A_DIM)
  352. # Help
  353. self.safe_add_str(my - 2, x, self.help_title, curses.A_BOLD)
  354. self.safe_add_str(my - 2, x + len(self.help_title), self.help,
  355. curses.A_DIM)
  356. win.refresh()
  357. def safe_add_str(self, y, x, string, *args, **kwargs):
  358. if x + len(string) > self.screen_width:
  359. string = string[:self.screen_width - x]
  360. self.win.addstr(y, x, string, *args, **kwargs)
  361. def init_screen(self):
  362. self.win = curses.initscr()
  363. self.win.nodelay(True)
  364. self.win.keypad(True)
  365. curses.start_color()
  366. curses.init_pair(1, self.foreground, self.background)
  367. # exception states
  368. curses.init_pair(2, curses.COLOR_RED, self.background)
  369. # successful state
  370. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  371. # revoked state
  372. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  373. # greeting
  374. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  375. # started state
  376. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  377. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  378. states.REVOKED: curses.color_pair(4),
  379. states.STARTED: curses.color_pair(6)}
  380. for state in states.EXCEPTION_STATES:
  381. self.state_colors[state] = curses.color_pair(2)
  382. curses.cbreak()
  383. def resetscreen(self):
  384. curses.nocbreak()
  385. self.win.keypad(False)
  386. curses.echo()
  387. curses.endwin()
  388. def nap(self):
  389. curses.napms(self.screen_delay)
  390. @property
  391. def tasks(self):
  392. return self.state.tasks_by_timestamp()[:self.limit]
  393. @property
  394. def workers(self):
  395. return [hostname
  396. for hostname, w in self.state.workers.items()
  397. if w.alive]
  398. class DisplayThread(threading.Thread): # pragma: no cover
  399. def __init__(self, display):
  400. self.display = display
  401. self.shutdown = False
  402. threading.Thread.__init__(self)
  403. def run(self):
  404. while not self.shutdown:
  405. self.display.draw()
  406. self.display.nap()
  407. def capture_events(app, state, display): # pragma: no cover
  408. def on_connection_error(exc, interval):
  409. sys.stderr.write("Connection Error: %r. Retry in %ss." % (
  410. exc, interval))
  411. while 1:
  412. sys.stderr.write("-> evtop: starting capture...\n")
  413. with app.broker_connection() as conn:
  414. try:
  415. conn.ensure_connection(on_connection_error,
  416. app.conf.BROKER_CONNECTION_MAX_RETRIES)
  417. recv = app.events.Receiver(conn, handlers={"*": state.event})
  418. display.resetscreen()
  419. display.init_screen()
  420. with recv.consumer():
  421. recv.drain_events(timeout=1, ignore_timeouts=True)
  422. except (conn.connection_errors, conn.channel_errors), exc:
  423. sys.stderr.write("Connection lost: %r" % (exc, ))
  424. def evtop(app=None): # pragma: no cover
  425. app = app_or_default(app)
  426. state = app.events.State()
  427. display = CursesMonitor(state, app=app)
  428. display.init_screen()
  429. refresher = DisplayThread(display)
  430. refresher.start()
  431. try:
  432. capture_events(app, state, display)
  433. except Exception:
  434. refresher.shutdown = True
  435. refresher.join()
  436. display.resetscreen()
  437. raise
  438. except (KeyboardInterrupt, SystemExit):
  439. refresher.shutdown = True
  440. refresher.join()
  441. display.resetscreen()
  442. if __name__ == "__main__": # pragma: no cover
  443. evtop()