dumper.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.dumper
  4. ~~~~~~~~~~~~~~~~~~~~
  5. THis is a simple program that dumps events to the console
  6. as they happen. Think of it like a `tcpdump` for Celery events.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. import sys
  12. from datetime import datetime
  13. from celery.app import app_or_default
  14. from celery.datastructures import LRUCache
  15. TASK_NAMES = LRUCache(limit=0xFFF)
  16. HUMAN_TYPES = {"worker-offline": "shutdown",
  17. "worker-online": "started",
  18. "worker-heartbeat": "heartbeat"}
  19. def humanize_type(type):
  20. try:
  21. return HUMAN_TYPES[type.lower()]
  22. except KeyError:
  23. return type.lower().replace("-", " ")
  24. def say(msg, out=sys.stdout):
  25. out.write(msg + "\n")
  26. class Dumper(object):
  27. def __init__(self, out=sys.stdout):
  28. self.out = out
  29. def say(self, msg):
  30. say(msg, out=self.out)
  31. def on_event(self, event):
  32. timestamp = datetime.utcfromtimestamp(event.pop("timestamp"))
  33. type = event.pop("type").lower()
  34. hostname = event.pop("hostname")
  35. if type.startswith("task-"):
  36. uuid = event.pop("uuid")
  37. if type in ("task-received", "task-sent"):
  38. task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
  39. event.pop("name"), uuid,
  40. event.pop("args"),
  41. event.pop("kwargs"))
  42. else:
  43. task = TASK_NAMES.get(uuid, "")
  44. return self.format_task_event(hostname, timestamp,
  45. type, task, event)
  46. fields = ", ".join("%s=%s" % (key, event[key])
  47. for key in sorted(event.keys()))
  48. sep = fields and ":" or ""
  49. self.say("%s [%s] %s%s %s" % (hostname, timestamp,
  50. humanize_type(type), sep, fields))
  51. def format_task_event(self, hostname, timestamp, type, task, event):
  52. fields = ", ".join("%s=%s" % (key, event[key])
  53. for key in sorted(event.keys()))
  54. sep = fields and ":" or ""
  55. self.say("%s [%s] %s%s %s %s" % (hostname, timestamp,
  56. humanize_type(type), sep, task, fields))
  57. def evdump(app=None, out=sys.stdout):
  58. app = app_or_default(app)
  59. dumper = Dumper(out=out)
  60. dumper.say("-> evdump: starting capture...")
  61. conn = app.broker_connection()
  62. recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
  63. try:
  64. recv.capture()
  65. except (KeyboardInterrupt, SystemExit):
  66. conn and conn.close()
  67. if __name__ == "__main__": # pragma: no cover
  68. evdump()