dumper.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. from __future__ import absolute_import
  2. import sys
  3. from datetime import datetime
  4. from ..app import app_or_default
  5. from ..datastructures import LocalCache
  6. TASK_NAMES = LocalCache(0xFFF)
  7. HUMAN_TYPES = {"worker-offline": "shutdown",
  8. "worker-online": "started",
  9. "worker-heartbeat": "heartbeat"}
  10. def humanize_type(type):
  11. try:
  12. return HUMAN_TYPES[type.lower()]
  13. except KeyError:
  14. return type.lower().replace("-", " ")
  15. class Dumper(object):
  16. def on_event(self, event):
  17. timestamp = datetime.fromtimestamp(event.pop("timestamp"))
  18. type = event.pop("type").lower()
  19. hostname = event.pop("hostname")
  20. if type.startswith("task-"):
  21. uuid = event.pop("uuid")
  22. if type.startswith("task-received"):
  23. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  24. event.pop("name"), uuid,
  25. event.pop("args"),
  26. event.pop("kwargs"))
  27. else:
  28. task = TASK_NAMES.get(uuid, "")
  29. return self.format_task_event(hostname, timestamp,
  30. type, task, event)
  31. fields = ", ".join("%s=%s" % (key, event[key])
  32. for key in sorted(event.keys()))
  33. sep = fields and ":" or ""
  34. print("%s [%s] %s%s %s" % (hostname, timestamp,
  35. humanize_type(type), sep, fields))
  36. def format_task_event(self, hostname, timestamp, type, task, event):
  37. fields = ", ".join("%s=%s" % (key, event[key])
  38. for key in sorted(event.keys()))
  39. sep = fields and ":" or ""
  40. print("%s [%s] %s%s %s %s" % (hostname, timestamp,
  41. humanize_type(type), sep, task, fields))
  42. def evdump(app=None):
  43. sys.stderr.write("-> evdump: starting capture...\n")
  44. app = app_or_default(app)
  45. dumper = Dumper()
  46. conn = app.broker_connection()
  47. recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
  48. try:
  49. recv.capture()
  50. except (KeyboardInterrupt, SystemExit):
  51. conn and conn.close()
  52. if __name__ == "__main__":
  53. evdump()