Browse Source

Added API list_worker_tasks(hostname)

Ask Solem 15 years ago
parent
commit
23333387c9
2 changed files with 15 additions and 0 deletions
  1. 6 0
      celery/monitoring/handlers/api.py
  2. 9 0
      celery/monitoring/state.py

+ 6 - 0
celery/monitoring/handlers/api.py

@@ -58,6 +58,11 @@ def list_workers(request):
     return monitor_state.list_workers()
 
 
+@api_handler
+def list_worker_tasks(request, hostname):
+    return monitor_state.list_worker_tasks(hostname)
+
+
 class RevokeTaskHandler(APIHandler):
 
     SUPPORTED_METHODS = ["POST"]
@@ -76,4 +81,5 @@ API = [
        (r"/revoke/task/", RevokeTaskHandler),
        (r"/task/(.+)", task_state),
        (r"/worker/", list_workers),
+       (r"/worker/(.+?)/tasks", list_worker_tasks),
 ]

+ 9 - 0
celery/monitoring/state.py

@@ -66,6 +66,15 @@ class MonitorState(object):
                 alive_workers.append({hostname: events[-1]["when"]})
         return alive_workers
 
+    def list_worker_tasks(self, hostname):
+        alive_workers = self.list_workers()
+        tasks_for_worker = defaultdict(lambda: [])
+        for hostname, when in alive_workers.items():
+            for task_id, task_info in self.tasks:
+                if task_info["hostname"] == hostname:
+                    tasks_for_worker[hostname].append(task_id)
+        return tasks_for_worker
+
     def receive_worker_event(self, event):
         event["state"] = event.pop("type")
         event["when"] = self.timestamp_to_isoformat(event["timestamp"])