Browse Source

Refactored API handlers and added new api: /api/revoke/task/$task_id

Ask Solem 15 years ago
parent
commit
6b34bc5aee
4 changed files with 36 additions and 16 deletions
  1. 3 3
      celery/messaging.py
  2. 29 10
      celery/monitoring/handlers/api.py
  3. 1 0
      celery/monitoring/web.py
  4. 3 3
      celery/task/__init__.py

+ 3 - 3
celery/messaging.py

@@ -100,7 +100,7 @@ class EventConsumer(Consumer):
 
 
 class BroadcastPublisher(Publisher):
-    exchange = "celerycast"
+    exchange = "celeryctl"
     exchange_type = "fanout"
     routing_key = ""
 
@@ -109,8 +109,8 @@ class BroadcastPublisher(Publisher):
 
 
 class BroadcastConsumer(Consumer):
-    queue = "celerycast"
-    exchange = "celerycast"
+    queue = "celeryctl"
+    exchange = "celeryctl"
     routing_key = ""
     exchange_type = "fanout"
     no_ack = True

+ 29 - 10
celery/monitoring/handlers/api.py

@@ -1,8 +1,22 @@
-from tornado.web import RequestHandler
+from functools import wraps
+
 import simplejson
+from tornado.web import RequestHandler
 
+from celery.task import revoke
 from celery.monitoring.state import monitor_state
 
+
+def JSON(fun):
+
+    @wraps(fun)
+    def _write_json(self, *args, **kwargs):
+        content = fun(self, *args, **kwargs)
+        self.write(simplejson.dumps(content))
+
+    return _write_json
+
+
 class APIHandler(RequestHandler):
 
     def __init__(self, *args, **kwargs):
@@ -12,30 +26,35 @@ class APIHandler(RequestHandler):
 
 class TaskStateHandler(APIHandler):
 
+    @JSON
     def get(self, task_id):
-        events = simplejson.dumps(monitor_state.task_events[task_id])
-        self.write(events)
+        return monitor_state.task_events[task_id]
 
 
 class ListTasksHandler(APIHandler):
 
+    @JSON
     def get(self):
-        tasks = simplejson.dumps(monitor_state.tasks_by_time())
-        self.write(tasks)
+        return monitor_state.tasks_by_time()
 
 
 class ListTasksByNameHandler(APIHandler):
 
+    @JSON
     def get(self, name):
-        tasks = simplejson.dumps(monitor_state.tasks_by_type()[name])
-        self.write(tasks)
+        return monitor_state.tasks_by_type()[name]
 
 
 class ListAllTasksByNameHandler(APIHandler):
-    def get(self):
-        tasks = simplejson.dumps(monitor_state.tasks_by_type())
-        self.write(tasks)
 
+    @JSON
+    def get(self):
+        return monitor_state.tasks_by_type()
 
 
+class RevokeTaskHandler(APIHandler):
 
+    @JSON
+    def get(self, task_id):
+        revoke(task_id)
+        return {"ok": True}

+ 1 - 0
celery/monitoring/web.py

@@ -18,6 +18,7 @@ class WebServerThread(threading.Thread):
             (r"/api/task/name/$", api.ListAllTasksByNameHandler),
             (r"/api/task/name/(.+?)", api.ListTasksByNameHandler),
             (r"/api/task/$", api.ListTasksHandler),
+            (r"/api/revoke/task/(.+?)", api.RevokeTaskHandler),
             (r"/api/task/(.+)", api.TaskStateHandler),
         ])
         http_server = httpserver.HTTPServer(application)

+ 3 - 3
celery/task/__init__.py

@@ -10,7 +10,7 @@ from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.backends import default_backend
-from celery.messaging import TaskConsumer, with_connection
+from celery.messaging import TaskConsumer, BroadcastPublisher, with_connection
 from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.task.base import ExecuteRemoteTask, AsynchronousMapTask
 from celery.task.rest import RESTProxyTask
@@ -43,9 +43,9 @@ def revoke(task_id, connection=None, connect_timeout=None):
     """
 
     def _revoke(connection):
-        broadcast = BroadcastPublisher(conn)
+        broadcast = BroadcastPublisher(connection)
         try:
-            broadcast.revoke(uuid)
+            broadcast.revoke(task_id)
         finally:
             broadcast.close()