celeryev.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import sys
  2. from datetime import datetime
  3. from celery.events import EventReceiver
  4. from celery.messaging import establish_connection
  5. from celery.datastructures import LocalCache
  6. TASK_NAMES = LocalCache(0xFFF)
  7. HUMAN_TYPES = {"worker-offline": "shutdown",
  8. "worker-online": "started",
  9. "worker-heartbeat": "heartbeat"}
  10. def humanize_type(type):
  11. try:
  12. return HUMAN_TYPES[type.lower()]
  13. except KeyError:
  14. return type.lower().replace("-", " ")
  15. def dump_event(event):
  16. timestamp = datetime.fromtimestamp(event.pop("timestamp"))
  17. type = event.pop("type").lower()
  18. hostname = event.pop("hostname")
  19. if type.startswith("task-"):
  20. uuid = event.pop("uuid")
  21. if type.startswith("task-received"):
  22. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  23. event.pop("name"), uuid,
  24. event.pop("args"),
  25. event.pop("kwargs"))
  26. else:
  27. task = TASK_NAMES.get(uuid, "")
  28. return format_task_event(hostname, timestamp, type, task, event)
  29. fields = ", ".join("%s=%s" % (key, event[key])
  30. for key in sorted(event.keys()))
  31. sep = fields and ":" or ""
  32. print("%s [%s] %s%s %s" % (hostname, timestamp,
  33. humanize_type(type), sep, fields))
  34. def format_task_event(hostname, timestamp, type, task, event):
  35. fields = ", ".join("%s=%s" % (key, event[key])
  36. for key in sorted(event.keys()))
  37. sep = fields and ":" or ""
  38. print("%s [%s] %s%s %s %s" % (hostname, timestamp,
  39. humanize_type(type), sep, task, fields))
  40. def eventdump():
  41. sys.stderr.write("-> celeryev: starting capture...\n")
  42. conn = establish_connection()
  43. recv = EventReceiver(conn, handlers={"*": dump_event})
  44. try:
  45. recv.capture()
  46. except (KeyboardInterrupt, SystemExit):
  47. conn and conn.close()
  48. main = eventdump
  49. if __name__ == "__main__":
  50. main()