celeryev.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. import sys
  2. import time
  3. import curses
  4. import atexit
  5. import socket
  6. import optparse
  7. import threading
  8. from datetime import datetime
  9. from itertools import count
  10. from celery.task import control
  11. from celery.events import EventReceiver
  12. from celery.events.state import State
  13. from celery.messaging import establish_connection
  14. from celery.datastructures import LocalCache
  15. TASK_NAMES = LocalCache(0xFFF)
  16. HUMAN_TYPES = {"worker-offline": "shutdown",
  17. "worker-online": "started",
  18. "worker-heartbeat": "heartbeat"}
  19. OPTION_LIST = (
  20. optparse.make_option('-d', '--DUMP',
  21. action="store_true", dest="dump",
  22. help="Dump events to stdout."),
  23. )
  24. def humanize_type(type):
  25. try:
  26. return HUMAN_TYPES[type.lower()]
  27. except KeyError:
  28. return type.lower().replace("-", " ")
  29. class Dumper(object):
  30. def on_event(self, event):
  31. timestamp = datetime.fromtimestamp(event.pop("timestamp"))
  32. type = event.pop("type").lower()
  33. hostname = event.pop("hostname")
  34. if type.startswith("task-"):
  35. uuid = event.pop("uuid")
  36. if type.startswith("task-received"):
  37. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  38. event.pop("name"), uuid,
  39. event.pop("args"),
  40. event.pop("kwargs"))
  41. else:
  42. task = TASK_NAMES.get(uuid, "")
  43. return self.format_task_event(hostname, timestamp,
  44. type, task, event)
  45. fields = ", ".join("%s=%s" % (key, event[key])
  46. for key in sorted(event.keys()))
  47. sep = fields and ":" or ""
  48. print("%s [%s] %s%s %s" % (hostname, timestamp,
  49. humanize_type(type), sep, fields))
  50. def format_task_event(self, hostname, timestamp, type, task, event):
  51. fields = ", ".join("%s=%s" % (key, event[key])
  52. for key in sorted(event.keys()))
  53. sep = fields and ":" or ""
  54. print("%s [%s] %s%s %s %s" % (hostname, timestamp,
  55. humanize_type(type), sep, task, fields))
  56. def abbr(S, max, dots=True):
  57. if S is None:
  58. return "???"
  59. if len(S) > max:
  60. return dots and S[:max-3] + "..." or S[:max-3]
  61. return S
  62. def abbrtask(S, max):
  63. if S is None:
  64. return "???"
  65. if len(S) > max:
  66. module, _, cls = rpartition(S, ".")
  67. module = abbr(module, max - len(cls), False)
  68. return module + "[.]" + cls
  69. return S
  70. class CursesMonitor(object):
  71. screen_width = None
  72. selected_task = None
  73. selected_position = 0
  74. selected_str = "Selected: "
  75. limit = 20
  76. foreground = curses.COLOR_BLACK
  77. background = curses.COLOR_WHITE
  78. online_str = "Workers online: "
  79. help = ("Keys: j, k: Move selection up/down. "
  80. "r: revoke selected task. q: quit")
  81. def __init__(self, state):
  82. self.state = state
  83. def format_row(self, uuid, task, worker, state):
  84. uuid = uuid.ljust(36)
  85. worker = abbr(worker, 16).ljust(16)
  86. task = abbrtask(task, 20).ljust(30)
  87. state = abbr(state, 8).ljust(8)
  88. row = " %s %s %s %s " % (uuid, worker, task, state)
  89. if self.screen_width is None:
  90. self.screen_width = len(row)
  91. return row
  92. def find_position(self):
  93. if not self.tasks:
  94. return 0
  95. for i, e in enumerate(self.tasks):
  96. if self.selected_task == e[0]:
  97. return i
  98. return 0
  99. def move_selection(self, reverse=False):
  100. if not self.tasks:
  101. return
  102. incr = reverse and -1 or 1
  103. pos = self.find_position() + incr
  104. try:
  105. self.selected_task = self.tasks[pos][0]
  106. except IndexError:
  107. self.selected_task = self.tasks[0][0]
  108. def handle_keypress(self):
  109. try:
  110. key = self.win.getkey().upper()
  111. except:
  112. return
  113. if key in ("KEY_DOWN", "J"):
  114. self.move_selection()
  115. elif key in ("KEY_UP", "K"):
  116. self.move_selection(reverse=True)
  117. elif key in ("R", ):
  118. self.revoke_selection()
  119. elif key in ("Q", ):
  120. raise KeyboardInterrupt
  121. def revoke_selection(self):
  122. control.revoke(self.selected_task)
  123. def draw(self):
  124. self.handle_keypress()
  125. win = self.win
  126. x = 3
  127. y = blank_line = count(2).next
  128. my, mx = win.getmaxyx()
  129. win.erase()
  130. win.bkgd(" ", curses.color_pair(1))
  131. win.border()
  132. win.addstr(y(), x, self.format_row("UUID", "TASK", "WORKER", "STATE"),
  133. curses.A_STANDOUT)
  134. for uuid, task in self.tasks:
  135. if task.uuid:
  136. attr = curses.A_NORMAL
  137. if task.uuid == self.selected_task:
  138. attr = curses.A_STANDOUT
  139. win.addstr(y(), x, self.format_row(uuid,
  140. task.name, task.worker.hostname, task.state),
  141. attr)
  142. if task.ready:
  143. task.visited = time.time()
  144. if self.selected_task:
  145. win.addstr(my - 4, x, self.selected_str, curses.A_BOLD)
  146. info = "Missing extended info"
  147. try:
  148. selection = self.state.tasks[self.selected_task]
  149. except KeyError:
  150. pass
  151. else:
  152. info = selection.info
  153. if "runtime" in info:
  154. info["runtime"] = "%.2fs" % info["runtime"]
  155. info = " ".join("%s=%s" % (key, value)
  156. for key, value in info.items())
  157. win.addstr(my - 4, x + len(self.selected_str), info)
  158. else:
  159. win.addstr(my - 4, x, "No task selected", curses.A_NORMAL)
  160. if self.workers:
  161. win.addstr(my - 3, x, self.online_str, curses.A_BOLD)
  162. win.addstr(my - 3, x + len(self.online_str),
  163. ", ".join(self.workers), curses.A_NORMAL)
  164. else:
  165. win.addstr(my - 3, x, "No workers discovered.")
  166. win.addstr(my - 2, x, self.help)
  167. win.refresh()
  168. def setupscreen(self):
  169. self.win = curses.initscr()
  170. curses.start_color()
  171. curses.init_pair(1, self.foreground, self.background)
  172. curses.cbreak()
  173. self.win.nodelay(True)
  174. self.win.keypad(True)
  175. def resetscreen(self):
  176. curses.nocbreak()
  177. curses.echo()
  178. curses.endwin()
  179. @property
  180. def tasks(self):
  181. return self.state.tasks_by_timestamp()[:self.limit]
  182. @property
  183. def workers(self):
  184. return [hostname
  185. for hostname, w in self.state.workers.items()
  186. if w.alive]
  187. class DisplayThread(threading.Thread):
  188. def __init__(self, display):
  189. self.display = display
  190. self.shutdown = False
  191. threading.Thread.__init__(self)
  192. def run(self):
  193. while not self.shutdown:
  194. self.display.draw()
  195. def eventtop():
  196. sys.stderr.write("-> celeryev: starting capture...\n")
  197. state = State()
  198. display = CursesMonitor(state)
  199. display.setupscreen()
  200. refresher = DisplayThread(display)
  201. refresher.start()
  202. conn = establish_connection()
  203. recv = EventReceiver(conn, handlers={"*": state.event})
  204. try:
  205. consumer = recv.consumer()
  206. consumer.consume()
  207. while True:
  208. try:
  209. conn.connection.drain_events()
  210. except socket.timeout:
  211. pass
  212. except Exception, exc:
  213. refresher.shutdown = True
  214. refresher.join()
  215. display.resetscreen()
  216. raise
  217. except (KeyboardInterrupt, SystemExit):
  218. conn and conn.close()
  219. refresher.shutdown = True
  220. refresher.join()
  221. display.resetscreen()
  222. def eventdump():
  223. sys.stderr.write("-> celeryev: starting capture...\n")
  224. dumper = Dumper()
  225. conn = establish_connection()
  226. recv = EventReceiver(conn, handlers={"*": dumper.on_event})
  227. try:
  228. recv.capture()
  229. except (KeyboardInterrupt, SystemExit):
  230. conn and conn.close()
  231. def run_celeryev(dump=False):
  232. if dump:
  233. return eventdump()
  234. return eventtop()
  235. def parse_options(arguments):
  236. """Parse the available options to ``celeryev``."""
  237. parser = optparse.OptionParser(option_list=OPTION_LIST)
  238. options, values = parser.parse_args(arguments)
  239. return options
  240. def main():
  241. options = parse_options(sys.argv[1:])
  242. return run_celeryev(**vars(options))
  243. if __name__ == "__main__":
  244. main()