Pārlūkot izejas kodu

Remote control command 'stats': Show detailed info about active tasks.

Ask Solem 15 gadi atpakaļ
vecāks
revīzija
233087e557
3 mainītis faili ar 40 papildinājumiem un 20 dzēšanām
  1. 6 3
      celery/worker/control/builtins.py
  2. 24 8
      celery/worker/job.py
  3. 10 9
      celery/worker/state.py

+ 6 - 3
celery/worker/control/builtins.py

@@ -3,6 +3,7 @@ from datetime import datetime
 from celery import conf
 from celery.backends import default_backend
 from celery.registry import tasks
+from celery.serialization import pickle
 from celery.utils import timeutils
 from celery.worker import state
 from celery.worker.state import revoked
@@ -124,9 +125,11 @@ def dump_active(panel, **kwargs):
 
 
 @Panel.register
-def stats(panel, **kwargs):
-    return {"active": state.active,
-            "total": state.total,
+def stats(panel, safe=False, **kwargs):
+    active_requests = [request.info(safe=safe)
+                            for request in state.active_requests]
+    return {"active": active_requests,
+            "total": state.total_count,
             "pool": panel.listener.pool.info}
 
 

+ 24 - 8
celery/worker/job.py

@@ -223,12 +223,6 @@ class TaskRequest(object):
 
         self.task = tasks[self.task_name]
 
-    def __repr__(self):
-        return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
-                self.__class__.__name__,
-                self.task_name, self.task_id,
-                self.args, self.kwargs)
-
     def revoked(self):
         if self._already_revoked:
             return True
@@ -361,7 +355,7 @@ class TaskRequest(object):
         return result
 
     def on_accepted(self):
-        state.task_accepted(self.task_name)
+        state.task_accepted(self)
         if not self.task.acks_late:
             self.acknowledge()
         self.send_event("task-started", uuid=self.task_id)
@@ -369,7 +363,7 @@ class TaskRequest(object):
             self.task_name, self.task_id))
 
     def on_timeout(self, soft):
-        state.task_ready(self.task_name)
+        state.task_ready(self)
         if soft:
             self.logger.warning("Soft time limit exceeded for %s[%s]" % (
                 self.task_name, self.task_id))
@@ -429,3 +423,25 @@ class TaskRequest(object):
             subject = self.email_subject.strip() % context
             body = self.email_body.strip() % context
             mail_admins(subject, body, fail_silently=True)
+
+    def __repr__(self):
+        return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
+                self.__class__.__name__,
+                self.task_name, self.task_id,
+                self.args, self.kwargs)
+
+    def info(self, safe=False):
+        args = self.args
+        kwargs = self.kwargs
+        if not safe:
+            args = repr(args)
+            kwargs = repr(self.kwargs)
+
+        return {"id": self.task_id,
+                "name": self.task_name,
+                "args": args,
+                "kwargs": kwargs,
+                "hostname": self.hostname,
+                "time_start": self.time_start,
+                "acknowledged": self.acknowledged,
+                "delivery_info": self.delivery_info}

+ 10 - 9
celery/worker/state.py

@@ -6,20 +6,21 @@ from celery.datastructures import LimitedSet
 REVOKES_MAX = 10000
 REVOKE_EXPIRES = 3600 # One hour.
 
-active = defaultdict(lambda: 0)
-total = defaultdict(lambda: 0)
+active_requests = set()
+total_count = defaultdict(lambda: 0)
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
 
-def task_accepted(task_name):
-    active[task_name] += 1
-    total[task_name] += 1
+def task_accepted(request):
+    active_requests.add(request)
+    total_count[request.task_name] += 1
 
 
-def task_ready(task_name):
-    active[task_name] -= 1
-    if active[task_name] == 0:
-        active.pop(task_name, None)
+def task_ready(request):
+    try:
+        active_requests.remove(request)
+    except IndexError:
+        pass
 
 
 class Persistent(object):