123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # -*- coding: utf-8 -*-
- """
- celery.events.dumper
- ~~~~~~~~~~~~~~~~~~~~
- THis is a simple program that dumps events to the console
- as they happen. Think of it like a `tcpdump` for Celery events.
- :copyright: (c) 2009 - 2012 by Ask Solem.
- :license: BSD, see LICENSE for more details.
- """
- from __future__ import absolute_import
- import sys
- from datetime import datetime
- from celery.app import app_or_default
- from celery.datastructures import LRUCache
- TASK_NAMES = LRUCache(limit=0xFFF)
- HUMAN_TYPES = {"worker-offline": "shutdown",
- "worker-online": "started",
- "worker-heartbeat": "heartbeat"}
- def humanize_type(type):
- try:
- return HUMAN_TYPES[type.lower()]
- except KeyError:
- return type.lower().replace("-", " ")
- def say(msg, out=sys.stdout):
- out.write(msg + "\n")
- class Dumper(object):
- def __init__(self, out=sys.stdout):
- self.out = out
- def say(self, msg):
- say(msg, out=self.out)
- def on_event(self, event):
- timestamp = datetime.utcfromtimestamp(event.pop("timestamp"))
- type = event.pop("type").lower()
- hostname = event.pop("hostname")
- if type.startswith("task-"):
- uuid = event.pop("uuid")
- if type in ("task-received", "task-sent"):
- task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
- event.pop("name"), uuid,
- event.pop("args"),
- event.pop("kwargs"))
- else:
- task = TASK_NAMES.get(uuid, "")
- return self.format_task_event(hostname, timestamp,
- type, task, event)
- fields = ", ".join("%s=%s" % (key, event[key])
- for key in sorted(event.keys()))
- sep = fields and ":" or ""
- self.say("%s [%s] %s%s %s" % (hostname, timestamp,
- humanize_type(type), sep, fields))
- def format_task_event(self, hostname, timestamp, type, task, event):
- fields = ", ".join("%s=%s" % (key, event[key])
- for key in sorted(event.keys()))
- sep = fields and ":" or ""
- self.say("%s [%s] %s%s %s %s" % (hostname, timestamp,
- humanize_type(type), sep, task, fields))
- def evdump(app=None, out=sys.stdout):
- app = app_or_default(app)
- dumper = Dumper(out=out)
- dumper.say("-> evdump: starting capture...")
- conn = app.broker_connection()
- recv = app.events.Receiver(conn, handlers={"*": dumper.on_event})
- try:
- recv.capture()
- except (KeyboardInterrupt, SystemExit):
- conn and conn.close()
- if __name__ == "__main__": # pragma: no cover
- evdump()
|