Browse Source

New command-line utility: celeryev - Capture and dump celery events (enable events with -E argument to celeryd)

Ask Solem 15 years ago
parent
commit
191e316c91
3 changed files with 76 additions and 1 deletions
  1. 9 0
      bin/celeryev
  2. 64 0
      celery/bin/celeryev.py
  3. 3 1
      setup.py

+ 9 - 0
bin/celeryev

@@ -0,0 +1,9 @@
+#!/usr/bin/env python
+import sys
+if not '' in sys.path:
+    sys.path.insert(0, '')
+
+from celery.bin import celeryev
+
+if __name__ == "__main__":
+    celeryev.main()

+ 64 - 0
celery/bin/celeryev.py

@@ -0,0 +1,64 @@
+import sys
+
+from datetime import datetime
+
+from celery.events import EventReceiver
+from celery.messaging import establish_connection
+from celery.datastructures import LocalCache
+
+TASK_NAMES = LocalCache(0xFFF)
+HUMAN_TYPES = {"worker-offline": "shutdown",
+               "worker-online": "started",
+               "worker-heartbeat": "heartbeat"}
+
+
+def humanize_type(type):
+    try:
+        return HUMAN_TYPES[type.lower()]
+    except KeyError:
+        return type.lower().replace("-", " ")
+
+
+def dump_event(event):
+    timestamp = datetime.fromtimestamp(event.pop("timestamp"))
+    type = event.pop("type").lower()
+    hostname = event.pop("hostname")
+    if type.startswith("task-"):
+        uuid = event.pop("uuid")
+        if type.startswith("task-received"):
+            task = TASK_NAMES[uuid] = "%s(%s) args=%s kwargs=%s" % (
+                    event.pop("name"), uuid,
+                    event.pop("args"),
+                    event.pop("kwargs"))
+        else:
+            task = TASK_NAMES.get(uuid, "")
+        return format_task_event(hostname, timestamp, type, task, event)
+    fields = ", ".join("%s=%s" % (key, event[key])
+                    for key in sorted(event.keys()))
+    sep = fields and ":" or ""
+    print("%s [%s] %s%s %s" % (hostname, timestamp,
+                                humanize_type(type), sep, fields))
+
+
+def format_task_event(hostname, timestamp, type, task, event):
+    fields = ", ".join("%s=%s" % (key, event[key])
+                    for key in sorted(event.keys()))
+    sep = fields and ":" or ""
+    print("%s [%s] %s%s %s %s" % (hostname, timestamp,
+                                humanize_type(type), sep, task, fields))
+
+def eventdump():
+    sys.stderr.write("-> celeryev: starting capture...\n")
+    conn = establish_connection()
+    recv = EventReceiver(conn, handlers={"*": dump_event})
+    try:
+        recv.capture()
+    except (KeyboardInterrupt, SystemExit):
+        conn and conn.close()
+
+
+main = eventdump
+
+
+if __name__ == "__main__":
+    main()

+ 3 - 1
setup.py

@@ -71,7 +71,8 @@ setup(
     license="BSD",
     packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
     scripts=["bin/celeryd", "bin/celerybeat",
-             "bin/camqadm", "bin/celeryd-multi"],
+             "bin/camqadm", "bin/celeryd-multi",
+             "bin/celeryev"],
     zip_safe=False,
     setup_requires=["nose", "nose-cover3", "unittest2>=0.4.0", "simplejson"],
     install_requires=install_requires,
@@ -95,6 +96,7 @@ setup(
             'celeryinit = celery.bin.celeryinit:main',
             'celerybeat = celery.bin.celerybeat:main',
             'camqadm = celery.bin.camqadm:main',
+            'celeryev = celery.bin.celeryev:main',
             'celeryd-multi = celery.bin.celeryd_multi:main',
             ],
     },