Browse Source

Better data structures in the api, refactored the Tornado Application stuff.

Ask Solem 15 years ago
parent
commit
43cedb375b
3 changed files with 74 additions and 26 deletions
  1. 30 17
      celery/monitoring/handlers/api.py
  2. 29 2
      celery/monitoring/state.py
  3. 15 7
      celery/monitoring/web.py

+ 30 - 17
celery/monitoring/handlers/api.py

@@ -1,7 +1,7 @@
 from functools import wraps
 
 import simplejson
-from tornado.web import RequestHandler
+from tornado.web import RequestHandler, Application
 
 from celery.task import revoke
 from celery.monitoring.state import monitor_state
@@ -24,37 +24,50 @@ class APIHandler(RequestHandler):
         self.set_header("Content-Type", "application/javascript")
 
 
-class TaskStateHandler(APIHandler):
+def api_handler(fun):
 
     @JSON
-    def get(self, task_id):
-        return monitor_state.task_events[task_id]
+    def get(self, *args, **kwargs):
+        return fun(self, *args, **kwargs)
 
+    return type(fun.__name__, (APIHandler, ), {"get": get})
 
-class ListTasksHandler(APIHandler):
 
-    @JSON
-    def get(self):
-        return monitor_state.tasks_by_time()
+@api_handler
+def task_state(request, task_id):
+    return monitor_state.get_task_info(task_id)
 
 
-class ListTasksByNameHandler(APIHandler):
+@api_handler
+def list_tasks(request):
+    return monitor_state.tasks_by_time()
 
-    @JSON
-    def get(self, name):
-        return monitor_state.tasks_by_type()[name]
 
+@api_handler
+def list_tasks_by_name(request, name):
+    return monitor_state.tasks_by_type()[name]
 
-class ListAllTasksByNameHandler(APIHandler):
 
-    @JSON
-    def get(self):
-        return monitor_state.tasks_by_type()
+@api_handler
+def list_task_types(request):
+    return monitor_state.tasks_by_type()
 
 
 class RevokeTaskHandler(APIHandler):
 
+    SUPPORTED_METHODS = ["POST"]
+
     @JSON
-    def get(self, task_id):
+    def post(self):
+        task_id = self.get_argument("task_id")
         revoke(task_id)
         return {"ok": True}
+
+
+API = [
+       (r"/task/name/$", list_task_types),
+       (r"/task/name/(.+?)", list_tasks_by_name),
+       (r"/task/$", list_tasks),
+       (r"/revoke/task/", RevokeTaskHandler),
+       (r"/task/(.+)", task_state),
+]

+ 29 - 2
celery/monitoring/state.py

@@ -1,5 +1,6 @@
 import time
 from collections import defaultdict
+from datetime import datetime
 
 HEARTBEAT_EXPIRE = 120 # Heartbeats must be at most 2 minutes apart.
 
@@ -23,15 +24,41 @@ class MonitorState(object):
                 t[task_type].append(id)
         return t
 
+    def get_task_info(self, task_id):
+        task_info = dict(self.tasks[task_id])
+
+        task_events = []
+        for event in self.task_events[task_id]:
+            print("EVENT >>>>>> %s" % event)
+            if event["state"] in ("task-failed", "task-retried"):
+                task_info["exception"] = event["exception"]
+                task_info["traceback"] = event["traceback"]
+            elif event["state"] == "task-succeeded":
+                task_info["result"] = event["result"]
+            task_events.append({event["state"]: event["when"]})
+        task_info["events"] = task_events
+
+        return task_info
+
+
     def receive_task_event(self, event):
         event["state"] = event.pop("type")
+        event["when"] = self.timestamp_to_isoformat(event["timestamp"])
         self.task_events[event["uuid"]].append(event)
 
+    def timestamp_to_datetime(self, timestamp):
+        return datetime.fromtimestamp(timestamp).isoformat()
+
     def receive_heartbeat(self, event):
         self.hearts[event["hostname"]] = event["timestamp"]
 
     def receive_task_received(self, event):
-        self.tasks[event["uuid"]] = event
+        task_info = dict(event)
+        event = dict(event)
+        task_info.pop("type")
+        event["state"] = event.pop("type")
+        event["when"] = self.timestamp_to_isoformat(event["timestamp"])
+        self.tasks[task_info["uuid"]] = task_info
         self.task_events[event["uuid"]].append(event)
 
     def receive_worker_event(self, event):
@@ -47,7 +74,7 @@ class MonitorState(object):
 
     def tasks_by_time(self):
         return dict(sorted(self.task_events.items(),
-                        key=lambda (uuid, events): events[-1]["timestamp"]))
+                        key=lambda uuid__events: uuid__events[1][-1]["timestamp"]))
 
     def tasks_by_last_state(self):
         return [events[-1] for event in self.task_by_time()]

+ 15 - 7
celery/monitoring/web.py

@@ -7,6 +7,18 @@ from tornado.web import Application
 from celery.monitoring.handlers import api
 
 
+class Site(Application):
+
+    def __init__(self, applications, *args, **kwargs):
+        handlers = []
+        for urlprefix, application in applications:
+            for urlmatch, handler in application:
+                handlers.append((urlprefix + urlmatch, handler))
+        kwargs["handlers"] = handlers
+        super(Site, self).__init__(*args, **kwargs)
+
+
+
 class WebServerThread(threading.Thread):
 
     def __init__(self):
@@ -14,13 +26,9 @@ class WebServerThread(threading.Thread):
         self.setDaemon(True)
 
     def run(self):
-        application = Application([
-            (r"/api/task/name/$", api.ListAllTasksByNameHandler),
-            (r"/api/task/name/(.+?)", api.ListTasksByNameHandler),
-            (r"/api/task/$", api.ListTasksHandler),
-            (r"/api/revoke/task/(.+?)", api.RevokeTaskHandler),
-            (r"/api/task/(.+)", api.TaskStateHandler),
+        site = Site([
+            (r"/api", api.API),
         ])
-        http_server = httpserver.HTTPServer(application)
+        http_server = httpserver.HTTPServer(site)
         http_server.listen(8989)
         ioloop.IOLoop.instance().start()