celeryev.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. import sys
  2. import time
  3. import curses
  4. import socket
  5. import optparse
  6. import threading
  7. from datetime import datetime
  8. from textwrap import wrap
  9. from itertools import count
  10. from carrot.utils import rpartition
  11. import celery
  12. from celery import states
  13. from celery.task import control
  14. from celery.events import EventReceiver
  15. from celery.events.state import State
  16. from celery.messaging import establish_connection
  17. from celery.datastructures import LocalCache
  18. TASK_NAMES = LocalCache(0xFFF)
  19. HUMAN_TYPES = {"worker-offline": "shutdown",
  20. "worker-online": "started",
  21. "worker-heartbeat": "heartbeat"}
  22. OPTION_LIST = (
  23. optparse.make_option('-d', '--DUMP',
  24. action="store_true", dest="dump",
  25. help="Dump events to stdout."),
  26. )
  27. def humanize_type(type):
  28. try:
  29. return HUMAN_TYPES[type.lower()]
  30. except KeyError:
  31. return type.lower().replace("-", " ")
  32. class Dumper(object):
  33. def on_event(self, event):
  34. timestamp = datetime.fromtimestamp(event.pop("timestamp"))
  35. type = event.pop("type").lower()
  36. hostname = event.pop("hostname")
  37. if type.startswith("task-"):
  38. uuid = event.pop("uuid")
  39. if type.startswith("task-received"):
  40. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  41. event.pop("name"), uuid,
  42. event.pop("args"),
  43. event.pop("kwargs"))
  44. else:
  45. task = TASK_NAMES.get(uuid, "")
  46. return self.format_task_event(hostname, timestamp,
  47. type, task, event)
  48. fields = ", ".join("%s=%s" % (key, event[key])
  49. for key in sorted(event.keys()))
  50. sep = fields and ":" or ""
  51. print("%s [%s] %s%s %s" % (hostname, timestamp,
  52. humanize_type(type), sep, fields))
  53. def format_task_event(self, hostname, timestamp, type, task, event):
  54. fields = ", ".join("%s=%s" % (key, event[key])
  55. for key in sorted(event.keys()))
  56. sep = fields and ":" or ""
  57. print("%s [%s] %s%s %s %s" % (hostname, timestamp,
  58. humanize_type(type), sep, task, fields))
  59. def abbr(S, max, dots=True):
  60. if S is None:
  61. return "???"
  62. if len(S) > max:
  63. return dots and S[:max-3] + "..." or S[:max-3]
  64. return S
  65. def abbrtask(S, max):
  66. if S is None:
  67. return "???"
  68. if len(S) > max:
  69. module, _, cls = rpartition(S, ".")
  70. module = abbr(module, max - len(cls), False)
  71. return module + "[.]" + cls
  72. return S
  73. class CursesMonitor(object):
  74. keymap = {}
  75. win = None
  76. screen_width = None
  77. screen_delay = 0.1
  78. selected_task = None
  79. selected_position = 0
  80. selected_str = "Selected: "
  81. limit = 20
  82. foreground = curses.COLOR_BLACK
  83. background = curses.COLOR_WHITE
  84. online_str = "Workers online: "
  85. help_title = "Keys: "
  86. help = ("j:up k:down i:info t:traceback r:result c:revoke ^c: quit")
  87. greet = "celeryev %s" % celery.__version__
  88. info_str = "Info: "
  89. def __init__(self, state, keymap=None):
  90. self.keymap = keymap or self.keymap
  91. self.state = state
  92. default_keymap = {"J": self.move_selection_down,
  93. "K": self.move_selection_up,
  94. "C": self.revoke_selection,
  95. "T": self.selection_traceback,
  96. "R": self.selection_result,
  97. "I": self.selection_info,
  98. "L": self.selection_rate_limit}
  99. self.keymap = dict(default_keymap, **self.keymap)
  100. def format_row(self, uuid, worker, task, timestamp, state):
  101. my, mx = self.win.getmaxyx()
  102. mx = mx - 3
  103. uuid_max = 36
  104. if mx < 88:
  105. uuid_max = mx - 52 - 2
  106. uuid = abbr(uuid, uuid_max).ljust(uuid_max)
  107. worker = abbr(worker, 16).ljust(16)
  108. task = abbrtask(task, 16).ljust(16)
  109. state = abbr(state, 8).ljust(8)
  110. timestamp = timestamp.ljust(8)
  111. row = "%s %s %s %s %s " % (uuid, worker, task, timestamp, state)
  112. if self.screen_width is None:
  113. self.screen_width = len(row[:mx])
  114. return row[:mx]
  115. def find_position(self):
  116. if not self.tasks:
  117. return 0
  118. for i, e in enumerate(self.tasks):
  119. if self.selected_task == e[0]:
  120. return i
  121. return 0
  122. def move_selection_up(self):
  123. self.move_selection(-1)
  124. def move_selection_down(self):
  125. self.move_selection(1)
  126. def move_selection(self, direction=1):
  127. if not self.tasks:
  128. return
  129. pos = self.find_position()
  130. try:
  131. self.selected_task = self.tasks[pos + direction][0]
  132. except IndexError:
  133. self.selected_task = self.tasks[0][0]
  134. keyalias = {curses.KEY_DOWN: "J",
  135. curses.KEY_UP: "K",
  136. curses.KEY_ENTER: "I"}
  137. def handle_keypress(self):
  138. try:
  139. key = self.win.getkey().upper()
  140. except:
  141. return
  142. key = self.keyalias.get(key) or key
  143. handler = self.keymap.get(key)
  144. if handler is not None:
  145. handler()
  146. def alert(self, callback, title=None):
  147. self.win.erase()
  148. my, mx = self.win.getmaxyx()
  149. y = blank_line = count(2).next
  150. if title:
  151. self.win.addstr(y(), 3, title, curses.A_BOLD | curses.A_UNDERLINE)
  152. blank_line()
  153. callback(my, mx, y())
  154. self.win.addstr(my - 1, 0, "Press any key to continue...",
  155. curses.A_BOLD)
  156. self.win.refresh()
  157. while 1:
  158. try:
  159. return self.win.getkey().upper()
  160. except:
  161. pass
  162. def selection_rate_limit(self):
  163. if not self.selected_task:
  164. return curses.beep()
  165. task = self.state.tasks[self.selected_task]
  166. if not task.name:
  167. return curses.beep()
  168. my, mx = self.win.getmaxyx()
  169. r = "New rate limit: "
  170. self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
  171. self.win.addstr(my - 2, len(r) + 3, " " * (mx - len(r)))
  172. rlimit = self.readline(my - 2, 3 + len(r))
  173. if rlimit:
  174. reply = control.rate_limit(task.name, rlimit.strip(), reply=True)
  175. self.alert_remote_control_reply(reply)
  176. def alert_remote_control_reply(self, reply):
  177. def callback(my, mx, xs):
  178. y = count(xs).next
  179. if not reply:
  180. self.win.addstr(y(), 3, "No replies received in 1s deadline.",
  181. curses.A_BOLD + curses.color_pair(2))
  182. return
  183. for subreply in reply:
  184. curline = y()
  185. host, response = subreply.items()[0]
  186. host = "%s: " % host
  187. self.win.addstr(curline, 3, host, curses.A_BOLD)
  188. attr = curses.A_NORMAL
  189. text = ""
  190. if "error" in response:
  191. text = response["error"]
  192. attr |= curses.color_pair(2)
  193. elif "ok" in response:
  194. text = response["ok"]
  195. attr |= curses.color_pair(3)
  196. self.win.addstr(curline, 3 + len(host), text, attr)
  197. return self.alert(callback, "Remote Control Command Replies")
  198. def readline(self, x, y):
  199. buffer = str()
  200. curses.echo()
  201. try:
  202. i = 0
  203. while True:
  204. ch = self.win.getch(x, y + i)
  205. if ch != -1:
  206. if ch in (10, curses.KEY_ENTER): # enter
  207. break
  208. if ch in (27, ):
  209. buffer = str()
  210. break
  211. buffer += chr(ch)
  212. i += 1
  213. finally:
  214. curses.noecho()
  215. return buffer
  216. def revoke_selection(self):
  217. if not self.selected_task:
  218. return curses.beep()
  219. reply = control.revoke(self.selected_task, reply=True)
  220. self.alert_remote_control_reply(reply)
  221. def selection_info(self):
  222. if not self.selected_task:
  223. return
  224. def alert_callback(mx, my, xs):
  225. y = count(xs).next
  226. task = self.state.tasks[self.selected_task]
  227. info = task.info(extra=["state"])
  228. infoitems = [("args", info.pop("args", None)),
  229. ("kwargs", info.pop("kwargs", None))] + info.items()
  230. for key, value in infoitems:
  231. if key is None:
  232. continue
  233. curline = y()
  234. keys = key + ": "
  235. self.win.addstr(curline, 3, keys, curses.A_BOLD)
  236. wrapped = wrap(str(value), mx - 2)
  237. if len(wrapped) == 1:
  238. self.win.addstr(curline, len(keys) + 3, wrapped[0])
  239. else:
  240. for subline in wrapped:
  241. self.win.addstr(y(), 3, " " * 4 + subline,
  242. curses.A_NORMAL)
  243. return self.alert(alert_callback,
  244. "Task details for %s" % self.selected_task)
  245. def selection_traceback(self):
  246. if not self.selected_task:
  247. return curses.beep()
  248. task = self.state.tasks[self.selected_task]
  249. if task.state not in states.EXCEPTION_STATES:
  250. return curses.beep()
  251. def alert_callback(my, mx, xs):
  252. y = count(xs).next
  253. for line in task.traceback.split("\n"):
  254. self.win.addstr(y(), 3, line)
  255. return self.alert(alert_callback,
  256. "Task Exception Traceback for %s" % self.selected_task)
  257. def selection_result(self):
  258. if not self.selected_task:
  259. return
  260. def alert_callback(my, mx, xs):
  261. y = count(xs).next
  262. task = self.state.tasks[self.selected_task]
  263. result = getattr(task, "result", None) or getattr(task,
  264. "exception", None)
  265. for line in wrap(result, mx - 2):
  266. self.win.addstr(y(), 3, line)
  267. return self.alert(alert_callback,
  268. "Task Result for %s" % self.selected_task)
  269. def draw(self):
  270. win = self.win
  271. self.handle_keypress()
  272. x = 3
  273. y = blank_line = count(2).next
  274. my, mx = win.getmaxyx()
  275. win.erase()
  276. win.bkgd(" ", curses.color_pair(1))
  277. win.border()
  278. win.addstr(1, x, self.greet, curses.A_DIM | curses.color_pair(5))
  279. blank_line()
  280. win.addstr(y(), x, self.format_row("UUID", "TASK",
  281. "WORKER", "TIME", "STATE"),
  282. curses.A_BOLD | curses.A_UNDERLINE)
  283. tasks = self.tasks
  284. if tasks:
  285. for uuid, task in tasks:
  286. if task.uuid:
  287. state_color = self.state_colors.get(task.state)
  288. attr = curses.A_NORMAL
  289. if task.uuid == self.selected_task:
  290. attr = curses.A_STANDOUT
  291. timestamp = datetime.fromtimestamp(
  292. task.timestamp or time.time())
  293. timef = timestamp.strftime("%H:%M:%S")
  294. line = self.format_row(uuid, task.name,
  295. task.worker.hostname,
  296. timef, task.state)
  297. lineno = y()
  298. win.addstr(lineno, x, line, attr)
  299. if state_color:
  300. win.addstr(lineno, len(line) - len(task.state) + 1,
  301. task.state, state_color | attr)
  302. if task.ready:
  303. task.visited = time.time()
  304. # -- Footer
  305. blank_line()
  306. win.hline(my - 6, x, curses.ACS_HLINE, self.screen_width)
  307. # Selected Task Info
  308. if self.selected_task:
  309. win.addstr(my - 5, x, self.selected_str, curses.A_BOLD)
  310. info = "Missing extended info"
  311. try:
  312. selection = self.state.tasks[self.selected_task]
  313. except KeyError:
  314. pass
  315. else:
  316. info = selection.info(["args", "kwargs",
  317. "result", "runtime", "eta"])
  318. if "runtime" in info:
  319. info["runtime"] = "%.2fs" % info["runtime"]
  320. if "result" in info:
  321. info["result"] = abbr(info["result"], 16)
  322. info = " ".join("%s=%s" % (key, value)
  323. for key, value in info.items())
  324. win.addstr(my - 5, x + len(self.selected_str), info)
  325. else:
  326. win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
  327. # Workers
  328. if self.workers:
  329. win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
  330. win.addstr(my - 4, x + len(self.online_str),
  331. ", ".join(self.workers), curses.A_NORMAL)
  332. else:
  333. win.addstr(my - 4, x, "No workers discovered.")
  334. # Info
  335. win.addstr(my - 3, x, self.info_str, curses.A_BOLD)
  336. win.addstr(my - 3, x + len(self.info_str),
  337. "events:%s tasks:%s workers:%s/%s" % (
  338. self.state.event_count, self.state.task_count,
  339. len([w for w in self.state.workers.values()
  340. if w.alive]),
  341. len(self.state.workers)),
  342. curses.A_DIM)
  343. # Help
  344. win.addstr(my - 2, x, self.help_title, curses.A_BOLD)
  345. win.addstr(my - 2, x + len(self.help_title), self.help, curses.A_DIM)
  346. win.refresh()
  347. def init_screen(self):
  348. self.win = curses.initscr()
  349. self.win.nodelay(True)
  350. self.win.keypad(True)
  351. curses.start_color()
  352. curses.init_pair(1, self.foreground, self.background)
  353. # exception states
  354. curses.init_pair(2, curses.COLOR_RED, self.background)
  355. # successful state
  356. curses.init_pair(3, curses.COLOR_GREEN, self.background)
  357. # revoked state
  358. curses.init_pair(4, curses.COLOR_MAGENTA, self.background)
  359. # greeting
  360. curses.init_pair(5, curses.COLOR_BLUE, self.background)
  361. # started state
  362. curses.init_pair(6, curses.COLOR_YELLOW, self.foreground)
  363. self.state_colors = {states.SUCCESS: curses.color_pair(3),
  364. states.REVOKED: curses.color_pair(4),
  365. states.STARTED: curses.color_pair(6)}
  366. for state in states.EXCEPTION_STATES:
  367. self.state_colors[state] = curses.color_pair(2)
  368. curses.cbreak()
  369. def resetscreen(self):
  370. curses.nocbreak()
  371. self.win.keypad(False)
  372. curses.echo()
  373. curses.endwin()
  374. def nap(self):
  375. curses.napms(int(self.screen_delay * 1000))
  376. @property
  377. def tasks(self):
  378. return self.state.tasks_by_timestamp()[:self.limit]
  379. @property
  380. def workers(self):
  381. return [hostname
  382. for hostname, w in self.state.workers.items()
  383. if w.alive]
  384. class DisplayThread(threading.Thread):
  385. def __init__(self, display):
  386. self.display = display
  387. self.shutdown = False
  388. threading.Thread.__init__(self)
  389. def run(self):
  390. while not self.shutdown:
  391. self.display.draw()
  392. self.display.nap()
  393. def eventtop():
  394. sys.stderr.write("-> celeryev: starting capture...\n")
  395. state = State()
  396. display = CursesMonitor(state)
  397. display.init_screen()
  398. refresher = DisplayThread(display)
  399. refresher.start()
  400. conn = establish_connection()
  401. recv = EventReceiver(conn, handlers={"*": state.event})
  402. try:
  403. consumer = recv.consumer()
  404. consumer.consume()
  405. while True:
  406. try:
  407. conn.connection.drain_events()
  408. except socket.timeout:
  409. pass
  410. except Exception:
  411. refresher.shutdown = True
  412. refresher.join()
  413. display.resetscreen()
  414. raise
  415. except (KeyboardInterrupt, SystemExit):
  416. conn and conn.close()
  417. refresher.shutdown = True
  418. refresher.join()
  419. display.resetscreen()
  420. def eventdump():
  421. sys.stderr.write("-> celeryev: starting capture...\n")
  422. dumper = Dumper()
  423. conn = establish_connection()
  424. recv = EventReceiver(conn, handlers={"*": dumper.on_event})
  425. try:
  426. recv.capture()
  427. except (KeyboardInterrupt, SystemExit):
  428. conn and conn.close()
  429. def run_celeryev(dump=False, **kwargs):
  430. if dump:
  431. return eventdump()
  432. return eventtop()
  433. def parse_options(arguments):
  434. """Parse the available options to ``celeryev``."""
  435. parser = optparse.OptionParser(option_list=OPTION_LIST)
  436. options, values = parser.parse_args(arguments)
  437. return options
  438. def main():
  439. options = parse_options(sys.argv[1:])
  440. return run_celeryev(**vars(options))
  441. if __name__ == "__main__":
  442. main()