Преглед изворни кода

Added some web API handlers: /api/task/$task_id /api/task /api/task/name/$name /api/task/name

Ask Solem пре 15 година
родитељ
комит
4bc78d9bc9

+ 3 - 4
celery/monitoring/__init__.py

@@ -1,7 +1,7 @@
 from carrot.connection import DjangoBrokerConnection
 
 from celery.events import EventReceiver
-from celery.monitoring.state import MonitorState
+from celery.monitoring.state import monitor_state
 from celery.monitoring.web import WebServerThread
 
 
@@ -13,7 +13,7 @@ class MonitorListener(object):
             "worker-heartbeat": state.receive_heartbeat,
             "worker-online": state.receive_worker_event,
             "worker-offline": state.receive_worker_event,
-            "task-received": state.receive_task_event,
+            "task-received": state.receive_task_received,
             "task-accepted": state.receive_task_event,
             "task-succeeded": state.receive_task_event,
             "task-failed": state.receive_task_event,
@@ -31,8 +31,7 @@ class MonitorService(object):
         self.is_detached = is_detached
 
     def start(self):
-        state = MonitorState()
-        listener = MonitorListener(state)
+        listener = MonitorListener(monitor_state)
         webthread = WebServerThread()
         webthread.start()
 

+ 0 - 0
celery/monitoring/handlers/__init__.py


+ 41 - 0
celery/monitoring/handlers/api.py

@@ -0,0 +1,41 @@
+from tornado.web import RequestHandler
+import simplejson
+
+from celery.monitoring.state import monitor_state
+
+class APIHandler(RequestHandler):
+
+    def __init__(self, *args, **kwargs):
+        super(APIHandler, self).__init__(*args, **kwargs)
+        self.set_header("Content-Type", "application/javascript")
+
+
+class TaskStateHandler(APIHandler):
+
+    def get(self, task_id):
+        events = simplejson.dumps(monitor_state.task_events[task_id])
+        self.write(events)
+
+
+class ListTasksHandler(APIHandler):
+
+    def get(self):
+        tasks = simplejson.dumps(monitor_state.tasks_by_time())
+        self.write(tasks)
+
+
+class ListTasksByNameHandler(APIHandler):
+
+    def get(self, name):
+        tasks = simplejson.dumps(monitor_state.tasks_by_type()[name])
+        self.write(tasks)
+
+
+class ListAllTasksByNameHandler(APIHandler):
+    def get(self):
+        tasks = simplejson.dumps(monitor_state.tasks_by_type())
+        self.write(tasks)
+
+
+
+

+ 21 - 8
celery/monitoring/state.py

@@ -8,22 +8,32 @@ class MonitorState(object):
 
     def __init__(self):
         self.hearts = {}
-        self.tasks = defaultdict(lambda: [])
+        self.tasks = {}
+        self.task_events = defaultdict(lambda: [])
         self.workers = defaultdict(lambda: [])
 
     def tasks_by_type(self):
-        types = {}
-        for id, task in self.tasks:
-            types[task["name"]] = task
-        return types
+        t = defaultdict(lambda: [])
+        for id, events in self.task_events.items():
+            try:
+                task_type = self.tasks[id]["name"]
+            except KeyError:
+                pass
+            else:
+                t[task_type].append(id)
+        return t
 
     def receive_task_event(self, event):
         event["state"] = event.pop("type")
-        self.tasks[event["uuid"]].append(event)
+        self.task_events[event["uuid"]].append(event)
 
     def receive_heartbeat(self, event):
         self.hearts[event["hostname"]] = event["timestamp"]
 
+    def receive_task_received(self, event):
+        self.tasks[event["uuid"]] = event
+        self.task_events[event["uuid"]].append(event)
+
     def receive_worker_event(self, event):
         self.workers[event["hostname"]].append(event["type"])
 
@@ -36,7 +46,10 @@ class MonitorState(object):
         return False
 
     def tasks_by_time(self):
-        return sorted(self.tasks.values(), key=lambda events: events[-1].timestamp)
+        return dict(sorted(self.task_events.items(),
+                        key=lambda (uuid, events): events[-1]["timestamp"]))
 
     def tasks_by_last_state(self):
-        return [events[-1] for event in self.tasks_by_time()]
+        return [events[-1] for event in self.task_by_time()]
+
+monitor_state = MonitorState()

+ 10 - 9
celery/monitoring/web.py

@@ -2,14 +2,9 @@ import threading
 
 from tornado import httpserver
 from tornado import ioloop
+from tornado.web import Application
 
-
-
-def handle_request(request):
-    message = "You requested %s\n" % request.uri
-    request.write("HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\n%s" % (
-                    len(message), message))
-    request.finish()
+from celery.monitoring.handlers import api
 
 
 class WebServerThread(threading.Thread):
@@ -19,6 +14,12 @@ class WebServerThread(threading.Thread):
         self.setDaemon(True)
 
     def run(self):
-        http_server = httpserver.HTTPServer(handle_request)
-        http_server.listen(8888)
+        application = Application([
+            (r"/api/task/name/$", api.ListAllTasksByNameHandler),
+            (r"/api/task/name/(.+?)", api.ListTasksByNameHandler),
+            (r"/api/task/$", api.ListTasksHandler),
+            (r"/api/task/(.+)", api.TaskStateHandler),
+        ])
+        http_server = httpserver.HTTPServer(application)
+        http_server.listen(8989)
         ioloop.IOLoop.instance().start()