celeryev.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. import sys
  2. import time
  3. import curses
  4. import socket
  5. import optparse
  6. import threading
  7. from datetime import datetime
  8. from textwrap import wrap
  9. from itertools import count
  10. from carrot.utils import rpartition
  11. import celery
  12. from celery import states
  13. from celery.task import control
  14. from celery.events import EventReceiver
  15. from celery.events.state import State
  16. from celery.messaging import establish_connection
  17. from celery.datastructures import LocalCache
  18. from celery.utils import instantiate
  19. TASK_NAMES = LocalCache(0xFFF)
  20. HUMAN_TYPES = {"worker-offline": "shutdown",
  21. "worker-online": "started",
  22. "worker-heartbeat": "heartbeat"}
  23. OPTION_LIST = (
  24. optparse.make_option('-d', '--DUMP',
  25. action="store_true", dest="dump",
  26. help="Dump events to stdout."),
  27. optparse.make_option('-c', '--camera',
  28. action="store", dest="camera",
  29. help="Camera class to take event snapshots with."),
  30. optparse.make_option('-f', '--frequency', '--freq',
  31. action="store", dest="frequency", type="float", default=1.0,
  32. help="Recording: Snapshot frequency."),
  33. optparse.make_option('-x', '--verbose',
  34. action="store_true", dest="verbose",
  35. help="Show more output.")
  36. )
  37. def humanize_type(type):
  38. try:
  39. return HUMAN_TYPES[type.lower()]
  40. except KeyError:
  41. return type.lower().replace("-", " ")
  42. class Dumper(object):
  43. def on_event(self, event):
  44. timestamp = datetime.fromtimestamp(event.pop("timestamp"))
  45. type = event.pop("type").lower()
  46. hostname = event.pop("hostname")
  47. if type.startswith("task-"):
  48. uuid = event.pop("uuid")
  49. if type.startswith("task-received"):
  50. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  51. event.pop("name"), uuid,
  52. event.pop("args"),
  53. event.pop("kwargs"))
  54. else:
  55. task = TASK_NAMES.get(uuid, "")
  56. return self.format_task_event(hostname, timestamp,
  57. type, task, event)
  58. fields = ", ".join("%s=%s" % (key, event[key])
  59. for key in sorted(event.keys()))
  60. sep = fields and ":" or ""
  61. print("%s [%s] %s%s %s" % (hostname, timestamp,
  62. humanize_type(type), sep, fields))
  63. def format_task_event(self, hostname, timestamp, type, task, event):
  64. fields = ", ".join("%s=%s" % (key, event[key])
  65. for key in sorted(event.keys()))
  66. sep = fields and ":" or ""
  67. print("%s [%s] %s%s %s %s" % (hostname, timestamp,
  68. humanize_type(type), sep, task, fields))
  69. def abbr(S, max, dots=True):
  70. if S is None:
  71. return "???"
  72. if len(S) > max:
  73. return dots and S[:max-3] + "..." or S[:max-3]
  74. return S
  75. def abbrtask(S, max):
  76. if S is None:
  77. return "???"
  78. if len(S) > max:
  79. module, _, cls = rpartition(S, ".")
  80. module = abbr(module, max - len(cls), False)
  81. return module + "[.]" + cls
  82. return S
  83. class CursesMonitor(object):
  84. keymap = {}
  85. win = None
  86. screen_width = None
  87. screen_delay = 0.1
  88. selected_task = None
  89. selected_position = 0
  90. selected_str = "Selected: "
  91. limit = 20
  92. foreground = curses.COLOR_BLACK
  93. background = curses.COLOR_WHITE
  94. online_str = "Workers online: "
  95. help_title = "Keys: "
  96. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  97. greet = "celeryev %s" % celery.__version__
  98. info_str = "Info: "
  99. def __init__(self, state, keymap=None):
  100. self.keymap = keymap or self.keymap
  101. self.state = state
  102. default_keymap = {"J": self.move_selection_down,
  103. "K": self.move_selection_up,
  104. "C": self.revoke_selection,
  105. "T": self.selection_traceback,
  106. "R": self.selection_result,
  107. "I": self.selection_info,
  108. "L": self.selection_rate_limit}
  109. self.keymap = dict(default_keymap, **self.keymap)
  110. def format_row(self, uuid, worker, task, timestamp, state):
  111. my, mx = self.win.getmaxyx()
  112. mx = mx - 3
  113. uuid_max = 36
  114. if mx < 88:
  115. uuid_max = mx - 52 - 2
  116. uuid = abbr(uuid, uuid_max).ljust(uuid_max)
  117. worker = abbr(worker, 16).ljust(16)
  118. task = abbrtask(task, 16).ljust(16)
  119. state = abbr(state, 8).ljust(8)
  120. timestamp = timestamp.ljust(8)
  121. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  122. if self.screen_width is None:
  123. self.screen_width = len(row[:mx])
  124. return row[:mx]
  125. def find_position(self):
  126. if not self.tasks:
  127. return 0
  128. for i, e in enumerate(self.tasks):
  129. if self.selected_task == e[0]:
  130. return i
  131. return 0
  132. def move_selection_up(self):
  133. self.move_selection(-1)
  134. def move_selection_down(self):
  135. self.move_selection(1)
  136. def move_selection(self, direction=1):
  137. if not self.tasks:
  138. return
  139. pos = self.find_position()
  140. try:
  141. self.selected_task = self.tasks[pos + direction][0]
  142. except IndexError:
  143. self.selected_task = self.tasks[0][0]
  144. keyalias = {curses.KEY_DOWN: "J",
  145. curses.KEY_UP: "K",
  146. curses.KEY_ENTER: "I"}
  147. def handle_keypress(self):
  148. try:
  149. key = self.win.getkey().upper()
  150. except:
  151. return
  152. key = self.keyalias.get(key) or key
  153. handler = self.keymap.get(key)
  154. if handler is not None:
  155. handler()
  156. def alert(self, callback, title=None):
  157. self.win.erase()
  158. my, mx = self.win.getmaxyx()
  159. y = blank_line = count(2).next
  160. if title:
  161. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  162. blank_line()
  163. callback(my, mx, y())
  164. self.win.addstr(my - 1, 0, "Press any key to continue...",
  165. curses.A_BOLD)
  166. self.win.refresh()
  167. while 1:
  168. try:
  169. return self.win.getkey().upper()
  170. except:
  171. pass
  172. def selection_rate_limit(self):
  173. if not self.selected_task:
  174. return curses.beep()
  175. task = self.state.tasks[self.selected_task]
  176. if not task.name:
  177. return curses.beep()
  178. my, mx = self.win.getmaxyx()
  179. r = "New rate limit: "
  180. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  181. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  182. rlimit = self.readline(my - 2, 3 + len(r))
  183. if rlimit:
  184. reply = control.rate_limit(task.name, rlimit.strip(), reply=True)
  185. self.alert_remote_control_reply(reply)
  186. def alert_remote_control_reply(self, reply):
  187. def callback(my, mx, xs):
  188. y = count(xs).next
  189. if not reply:
  190. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  191. curses.A_BOLD + curses.color_pair(2))
  192. return
  193. for subreply in reply:
  194. curline = y()
  195. host, response = subreply.items()[0]
  196. host = "%s: " % host
  197. self.win.addstr(curline, 3, host, curses.A_BOLD)
  198. attr = curses.A_NORMAL
  199. text = ""
  200. if "error" in response:
  201. text = response["error"]
  202. attr |= curses.color_pair(2)
  203. elif "ok" in response:
  204. text = response["ok"]
  205. attr |= curses.color_pair(3)
  206. self.win.addstr(curline, 3 + len(host), text, attr)
  207. return self.alert(callback, "Remote Control Command Replies")
  208. def readline(self, x, y):
  209. buffer = str()
  210. curses.echo()
  211. try:
  212. i = 0
  213. while True:
  214. ch = self.win.getch(x, y + i)
  215. if ch != -1:
  216. if ch in (10, curses.KEY_ENTER): # enter
  217. break
  218. if ch in (27, ):
  219. buffer = str()
  220. break
  221. buffer += chr(ch)
  222. i += 1
  223. finally:
  224. curses.noecho()
  225. return buffer
  226. def revoke_selection(self):
  227. if not self.selected_task:
  228. return curses.beep()
  229. reply = control.revoke(self.selected_task, reply=True)
  230. self.alert_remote_control_reply(reply)
  231. def selection_info(self):
  232. if not self.selected_task:
  233. return
  234. def alert_callback(mx, my, xs):
  235. y = count(xs).next
  236. task = self.state.tasks[self.selected_task]
  237. info = task.info(extra=["state"])
  238. infoitems = [("args", info.pop("args", None)),
  239. ("kwargs", info.pop("kwargs", None))] + info.items()
  240. for key, value in infoitems:
  241. if key is None:
  242. continue
  243. curline = y()
  244. keys = key + ": "
  245. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  246. wrapped = wrap(str(value), mx - 2)
  247. if len(wrapped) == 1:
  248. self.win.addstr(curline, len(keys) + 3, wrapped[0])
  249. else:
  250. for subline in wrapped:
  251. self.win.addstr(y(), 3, " " * 4 + subline,
  252. curses.A_NORMAL)
  253. return self.alert(alert_callback,
  254. "Task details for %s" % self.selected_task)
  255. def selection_traceback(self):
  256. if not self.selected_task:
  257. return curses.beep()
  258. task = self.state.tasks[self.selected_task]
  259. if task.state not in states.EXCEPTION_STATES:
  260. return curses.beep()
  261. def alert_callback(my, mx, xs):
  262. y = count(xs).next
  263. for line in task.traceback.split("\n"):
  264. self.win.addstr(y(), 3, line)
  265. return self.alert(alert_callback,
  266. "Task Exception Traceback for %s" % self.selected_task)
  267. def selection_result(self):
  268. if not self.selected_task:
  269. return
  270. def alert_callback(my, mx, xs):
  271. y = count(xs).next
  272. task = self.state.tasks[self.selected_task]
  273. result = getattr(task, "result", None) or getattr(task,
  274. "exception", None)
  275. for line in wrap(result, mx - 2):
  276. self.win.addstr(y(), 3, line)
  277. return self.alert(alert_callback,
  278. "Task Result for %s" % self.selected_task)
  279. def draw(self):
  280. win = self.win
  281. self.handle_keypress()
  282. x = 3
  283. y = blank_line = count(2).next
  284. my, mx = win.getmaxyx()
  285. win.erase()
  286. win.bkgd(" ", curses.color_pair(1))
  287. win.border()
  288. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  289. blank_line()
  290. win.addstr(y(), x, self.format_row("UUID", "TASK",
  291. "WORKER", "TIME", "STATE"),
  292. curses.A_BOLD | curses.A_UNDERLINE)
  293. tasks = self.tasks
  294. if tasks:
  295. for uuid, task in tasks:
  296. if task.uuid:
  297. state_color = self.state_colors.get(task.state)
  298. attr = curses.A_NORMAL
  299. if task.uuid == self.selected_task:
  300. attr = curses.A_STANDOUT
  301. timestamp = datetime.fromtimestamp(
  302. task.timestamp or time.time())
  303. timef = timestamp.strftime("%H:%M:%S")
  304. line = self.format_row(uuid, task.name,
  305. task.worker.hostname,
  306. timef, task.state)
  307. lineno = y()
  308. win.addstr(lineno, x, line, attr)
  309. if state_color:
  310. win.addstr(lineno, len(line) - len(task.state) + 1,
  311. task.state, state_color | attr)
  312. if task.ready:
  313. task.visited = time.time()
  314. # -- Footer
  315. blank_line()
  316. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width)
  317. # Selected Task Info
  318. if self.selected_task:
  319. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  320. info = "Missing extended info"
  321. try:
  322. selection = self.state.tasks[self.selected_task]
  323. except KeyError:
  324. pass
  325. else:
  326. info = selection.info(["args", "kwargs",
  327. "result", "runtime", "eta"])
  328. if "runtime" in info:
  329. info["runtime"] = "%.2fs" % info["runtime"]
  330. if "result" in info:
  331. info["result"] = abbr(info["result"], 16)
  332. info = " ".join("%s=%s" % (key, value)
  333. for key, value in info.items())
  334. win.addstr(my - 5, x + len(self.selected_str), info)
  335. else:
  336. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  337. # Workers
  338. if self.workers:
  339. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  340. win.addstr(my - 4, x + len(self.online_str),
  341. ", ".join(self.workers), curses.A_NORMAL)
  342. else:
  343. win.addstr(my - 4, x, "No workers discovered.")
  344. # Info
  345. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  346. win.addstr(my - 3, x + len(self.info_str),
  347. "events:%s tasks:%s workers:%s/%s" % (
  348. self.state.event_count, self.state.task_count,
  349. len([w for w in self.state.workers.values()
  350. if w.alive]),
  351. len(self.state.workers)),
  352. curses.A_DIM)
  353. # Help
  354. win.addstr(my - 2, x, self.help_title, curses.A_BOLD)
  355. win.addstr(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
  356. win.refresh()
  357. def init_screen(self):
  358. self.win = curses.initscr()
  359. self.win.nodelay(True)
  360. self.win.keypad(True)
  361. curses.start_color()
  362. curses.init_pair(1, self.foreground, self.background)
  363. # exception states
  364. curses.init_pair(2, curses.COLOR_RED, self.background)
  365. # successful state
  366. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  367. # revoked state
  368. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  369. # greeting
  370. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  371. # started state
  372. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  373. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  374. states.REVOKED: curses.color_pair(4),
  375. states.STARTED: curses.color_pair(6)}
  376. for state in states.EXCEPTION_STATES:
  377. self.state_colors[state] = curses.color_pair(2)
  378. curses.cbreak()
  379. def resetscreen(self):
  380. curses.nocbreak()
  381. self.win.keypad(False)
  382. curses.echo()
  383. curses.endwin()
  384. def nap(self):
  385. curses.napms(int(self.screen_delay * 1000))
  386. @property
  387. def tasks(self):
  388. return self.state.tasks_by_timestamp()[:self.limit]
  389. @property
  390. def workers(self):
  391. return [hostname
  392. for hostname, w in self.state.workers.items()
  393. if w.alive]
  394. class DisplayThread(threading.Thread):
  395. def __init__(self, display):
  396. self.display = display
  397. self.shutdown = False
  398. threading.Thread.__init__(self)
  399. def run(self):
  400. while not self.shutdown:
  401. self.display.draw()
  402. self.display.nap()
  403. def run_camera(camera, freq, verbose=False):
  404. sys.stderr.write(
  405. "-> celeryev: Taking snapshots with %s (every %s secs.)\n" % (
  406. camera, freq))
  407. state = State()
  408. cam = instantiate(camera, state, freq=freq, verbose=verbose)
  409. cam.install()
  410. conn = establish_connection()
  411. recv = EventReceiver(conn, handlers={"*": state.event})
  412. try:
  413. recv.capture(limit=None)
  414. finally:
  415. cam.cancel()
  416. conn.close()
  417. def eventtop():
  418. sys.stderr.write("-> celeryev: starting capture...\n")
  419. state = State()
  420. display = CursesMonitor(state)
  421. display.init_screen()
  422. refresher = DisplayThread(display)
  423. refresher.start()
  424. conn = establish_connection()
  425. recv = EventReceiver(conn, handlers={"*": state.event})
  426. try:
  427. recv.capture(limit=None)
  428. except Exception:
  429. refresher.shutdown = True
  430. refresher.join()
  431. display.resetscreen()
  432. raise
  433. except (KeyboardInterrupt, SystemExit):
  434. conn and conn.close()
  435. refresher.shutdown = True
  436. refresher.join()
  437. display.resetscreen()
  438. def eventdump():
  439. sys.stderr.write("-> celeryev: starting capture...\n")
  440. dumper = Dumper()
  441. conn = establish_connection()
  442. recv = EventReceiver(conn, handlers={"*": dumper.on_event})
  443. try:
  444. recv.capture()
  445. except (KeyboardInterrupt, SystemExit):
  446. conn and conn.close()
  447. def run_celeryev(dump=False, camera=None, frequency=1.0, verbose=False,
  448. **kwargs):
  449. if dump:
  450. return eventdump()
  451. if camera:
  452. return run_camera(camera, frequency, verbose=verbose)
  453. return eventtop()
  454. def parse_options(arguments):
  455. """Parse the available options to ``celeryev``."""
  456. parser = optparse.OptionParser(option_list=OPTION_LIST)
  457. options, values = parser.parse_args(arguments)
  458. return options
  459. def main():
  460. options = parse_options(sys.argv[1:])
  461. return run_celeryev(**vars(options))
  462. if __name__ == "__main__":
  463. main()