dumper.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.dumper
  4. ~~~~~~~~~~~~~~~~~~~~
  5. THis is a simple program that dumps events to the console
  6. as they happen. Think of it like a `tcpdump` for Celery events.
  7. :copyright: (c) 2009 - 2011 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. import sys
  12. from datetime import datetime
  13. from ..app import app_or_default
  14. from ..datastructures import LRUCache
  15. TASK_NAMES = LRUCache(limit=0xFFF)
  16. HUMAN_TYPES = {"worker-offline": "shutdown",
  17. "worker-online": "started",
  18. "worker-heartbeat": "heartbeat"}
  19. def humanize_type(type):
  20. try:
  21. return HUMAN_TYPES[type.lower()]
  22. except KeyError:
  23. return type.lower().replace("-", " ")
  24. class Dumper(object):
  25. def on_event(self, event):
  26. timestamp = datetime.utcfromtimestamp(event.pop("timestamp"))
  27. type = event.pop("type").lower()
  28. hostname = event.pop("hostname")
  29. if type.startswith("task-"):
  30. uuid = event.pop("uuid")
  31. if type in ("task-received", "task-sent"):
  32. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  33. event.pop("name"), uuid,
  34. event.pop("args"),
  35. event.pop("kwargs"))
  36. else:
  37. task = TASK_NAMES.get(uuid, "")
  38. return self.format_task_event(hostname, timestamp,
  39. type, task, event)
  40. fields = ", ".join("%s=%s" % (key, event[key])
  41. for key in sorted(event.keys()))
  42. sep = fields and ":" or ""
  43. print("%s [%s] %s%s %s" % (hostname, timestamp,
  44. humanize_type(type), sep, fields))
  45. def format_task_event(self, hostname, timestamp, type, task, event):
  46. fields = ", ".join("%s=%s" % (key, event[key])
  47. for key in sorted(event.keys()))
  48. sep = fields and ":" or ""
  49. print("%s [%s] %s%s %s %s" % (hostname, timestamp,
  50. humanize_type(type), sep, task, fields))
  51. def evdump(app=None):
  52. sys.stderr.write("-> evdump: starting capture...\n")
  53. app = app_or_default(app)
  54. dumper = Dumper()
  55. conn = app.broker_connection()
  56. recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
  57. try:
  58. recv.capture()
  59. except (KeyboardInterrupt, SystemExit):
  60. conn and conn.close()
  61. if __name__ == "__main__":
  62. evdump()