123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- import sys
- import time
- import curses
- import atexit
- import socket
- import optparse
- import threading
- from datetime import datetime
- from itertools import count
- from celery.task import control
- from celery.events import EventReceiver
- from celery.events.state import State
- from celery.messaging import establish_connection
- from celery.datastructures import LocalCache
- TASK_NAMES = LocalCache(0xFFF)
- HUMAN_TYPES = {"worker-offline": "shutdown",
- "worker-online": "started",
- "worker-heartbeat": "heartbeat"}
- OPTION_LIST = (
- optparse.make_option('-d', '--DUMP',
- action="store_true", dest="dump",
- help="Dump events to stdout."),
- )
- def humanize_type(type):
- try:
- return HUMAN_TYPES[type.lower()]
- except KeyError:
- return type.lower().replace("-", " ")
- class Dumper(object):
- def on_event(self, event):
- timestamp = datetime.fromtimestamp(event.pop("timestamp"))
- type = event.pop("type").lower()
- hostname = event.pop("hostname")
- if type.startswith("task-"):
- uuid = event.pop("uuid")
- if type.startswith("task-received"):
- task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
- event.pop("name"), uuid,
- event.pop("args"),
- event.pop("kwargs"))
- else:
- task = TASK_NAMES.get(uuid, "")
- return self.format_task_event(hostname, timestamp,
- type, task, event)
- fields = ", ".join("%s=%s" % (key, event[key])
- for key in sorted(event.keys()))
- sep = fields and ":" or ""
- print("%s [%s] %s%s %s" % (hostname, timestamp,
- humanize_type(type), sep, fields))
- def format_task_event(self, hostname, timestamp, type, task, event):
- fields = ", ".join("%s=%s" % (key, event[key])
- for key in sorted(event.keys()))
- sep = fields and ":" or ""
- print("%s [%s] %s%s %s %s" % (hostname, timestamp,
- humanize_type(type), sep, task, fields))
- def abbr(S, max, dots=True):
- if S is None:
- return "???"
- if len(S) > max:
- return dots and S[:max-3] + "..." or S[:max-3]
- return S
- def abbrtask(S, max):
- if S is None:
- return "???"
- if len(S) > max:
- module, _, cls = rpartition(S, ".")
- module = abbr(module, max - len(cls), False)
- return module + "[.]" + cls
- return S
- class CursesMonitor(object):
- screen_width = None
- selected_task = None
- selected_position = 0
- selected_str = "Selected: "
- limit = 20
- foreground = curses.COLOR_BLACK
- background = curses.COLOR_WHITE
- online_str = "Workers online: "
- help = ("Keys: j, k: Move selection up/down. "
- "r: revoke selected task. q: quit")
- def __init__(self, state):
- self.state = state
- def format_row(self, uuid, task, worker, state):
- uuid = uuid.ljust(36)
- worker = abbr(worker, 16).ljust(16)
- task = abbrtask(task, 20).ljust(30)
- state = abbr(state, 8).ljust(8)
- row = " %s %s %s %s " % (uuid, worker, task, state)
- if self.screen_width is None:
- self.screen_width = len(row)
- return row
- def find_position(self):
- if not self.tasks:
- return 0
- for i, e in enumerate(self.tasks):
- if self.selected_task == e[0]:
- return i
- return 0
- def move_selection(self, reverse=False):
- if not self.tasks:
- return
- incr = reverse and -1 or 1
- pos = self.find_position() + incr
- try:
- self.selected_task = self.tasks[pos][0]
- except IndexError:
- self.selected_task = self.tasks[0][0]
- def handle_keypress(self):
- try:
- key = self.win.getkey().upper()
- except:
- return
- if key in ("KEY_DOWN", "J"):
- self.move_selection()
- elif key in ("KEY_UP", "K"):
- self.move_selection(reverse=True)
- elif key in ("R", ):
- self.revoke_selection()
- elif key in ("Q", ):
- raise KeyboardInterrupt
- def revoke_selection(self):
- control.revoke(self.selected_task)
- def draw(self):
- self.handle_keypress()
- win = self.win
- x = 3
- y = blank_line = count(2).next
- my, mx = win.getmaxyx()
- win.erase()
- win.bkgd(" ", curses.color_pair(1))
- win.border()
- win.addstr(y(), x, self.format_row("UUID", "TASK", "WORKER", "STATE"),
- curses.A_STANDOUT)
- for uuid, task in self.tasks:
- if task.uuid:
- attr = curses.A_NORMAL
- if task.uuid == self.selected_task:
- attr = curses.A_STANDOUT
- win.addstr(y(), x, self.format_row(uuid,
- task.name, task.worker.hostname, task.state),
- attr)
- if task.ready:
- task.visited = time.time()
- if self.selected_task:
- win.addstr(my - 4, x, self.selected_str, curses.A_BOLD)
- info = "Missing extended info"
- try:
- selection = self.state.tasks[self.selected_task]
- except KeyError:
- pass
- else:
- info = selection.info
- if "runtime" in info:
- info["runtime"] = "%.2fs" % info["runtime"]
- info = " ".join("%s=%s" % (key, value)
- for key, value in info.items())
- win.addstr(my - 4, x + len(self.selected_str), info)
- else:
- win.addstr(my - 4, x, "No task selected", curses.A_NORMAL)
- if self.workers:
- win.addstr(my - 3, x, self.online_str, curses.A_BOLD)
- win.addstr(my - 3, x + len(self.online_str),
- ", ".join(self.workers), curses.A_NORMAL)
- else:
- win.addstr(my - 3, x, "No workers discovered.")
- win.addstr(my - 2, x, self.help)
- win.refresh()
- def setupscreen(self):
- self.win = curses.initscr()
- curses.start_color()
- curses.init_pair(1, self.foreground, self.background)
- curses.cbreak()
- self.win.nodelay(True)
- self.win.keypad(True)
- def resetscreen(self):
- curses.nocbreak()
- curses.echo()
- curses.endwin()
- @property
- def tasks(self):
- return self.state.tasks_by_timestamp()[:self.limit]
- @property
- def workers(self):
- return [hostname
- for hostname, w in self.state.workers.items()
- if w.alive]
- class DisplayThread(threading.Thread):
- def __init__(self, display):
- self.display = display
- self.shutdown = False
- threading.Thread.__init__(self)
- def run(self):
- while not self.shutdown:
- self.display.draw()
- def eventtop():
- sys.stderr.write("-> celeryev: starting capture...\n")
- state = State()
- display = CursesMonitor(state)
- display.setupscreen()
- refresher = DisplayThread(display)
- refresher.start()
- conn = establish_connection()
- recv = EventReceiver(conn, handlers={"*": state.event})
- try:
- consumer = recv.consumer()
- consumer.consume()
- while True:
- try:
- conn.connection.drain_events()
- except socket.timeout:
- pass
- except Exception, exc:
- refresher.shutdown = True
- refresher.join()
- display.resetscreen()
- raise
- except (KeyboardInterrupt, SystemExit):
- conn and conn.close()
- refresher.shutdown = True
- refresher.join()
- display.resetscreen()
- def eventdump():
- sys.stderr.write("-> celeryev: starting capture...\n")
- dumper = Dumper()
- conn = establish_connection()
- recv = EventReceiver(conn, handlers={"*": dumper.on_event})
- try:
- recv.capture()
- except (KeyboardInterrupt, SystemExit):
- conn and conn.close()
- def run_celeryev(dump=False):
- if dump:
- return eventdump()
- return eventtop()
- def parse_options(arguments):
- """Parse the available options to ``celeryev``."""
- parser = optparse.OptionParser(option_list=OPTION_LIST)
- options, values = parser.parse_args(arguments)
- return options
- def main():
- options = parse_options(sys.argv[1:])
- return run_celeryev(**vars(options))
- if __name__ == "__main__":
- main()
|