Browse Source

inspect stats command now collects info from installed bootsteps.Step.info

Ask Solem 12 years ago
parent
commit
e8f10d2b95

+ 9 - 0
celery/bootsteps.py

@@ -118,6 +118,12 @@ class Namespace(object):
             step.start(parent)
             debug('^-- substep ok')
 
+    def info(self, parent):
+        info = {}
+        for step in parent.steps:
+            info.update(step.info(parent) or {})
+        return info
+
     def close(self, parent):
         if self.on_close:
             self.on_close()
@@ -329,6 +335,9 @@ class Step(object):
     def alias(self):
         return self.label or _label(self)
 
+    def info(self, obj):
+        pass
+
 
 class StartStopStep(Step):
 

+ 12 - 0
celery/worker/__init__.py

@@ -11,6 +11,7 @@
 """
 from __future__ import absolute_import
 
+import os
 import socket
 import sys
 import traceback
@@ -266,6 +267,17 @@ class WorkController(configurated):
                 reload_from_cwd(sys.modules[module], reloader)
         self.pool.restart()
 
+    def info(self):
+        return {'total': self.state.total_count,
+                'pid': os.getpid(),
+                'clock': str(self.app.clock)}
+
+    def stats(self):
+        info = self.info()
+        info.update(self.namespace.info(self))
+        info.update(self.consumer.namespace.info(self.consumer))
+        return info
+
     @property
     def _state(self):
         return self.namespace.state

+ 3 - 0
celery/worker/components.py

@@ -179,6 +179,9 @@ class Pool(bootsteps.StartStopStep):
             w.hub.on_init.append(partial(self.on_poll_init, pool))
         return pool
 
+    def info(self, w):
+        return {'pool': w.pool.info}
+
 
 class Beat(bootsteps.StartStopStep):
     """Step used to embed a beat process.

+ 8 - 15
celery/worker/consumer.py

@@ -295,21 +295,6 @@ class Consumer(object):
         self.app.amqp.queues.select_remove(queue)
         self.task_consumer.cancel_by_queue(queue)
 
-    @property
-    def info(self):
-        """Returns information about this consumer instance
-        as a dict.
-
-        This is also the consumer related info returned by
-        :program:`celery inspect stats`.
-
-        """
-        conninfo = {}
-        if self.connection:
-            conninfo = self.connection.info()
-            conninfo.pop('password', None)  # don't send password.
-        return {'broker': conninfo, 'prefetch_count': self.qos.value}
-
     def on_task(self, task, task_reserved=task_reserved):
         """Handle received task.
 
@@ -396,6 +381,11 @@ class Connection(bootsteps.StartStopStep):
         if connection:
             ignore_errors(connection, connection.close)
 
+    def info(self, c):
+        info = c.connection.info()
+        info.pop('password', None)  # don't send password.
+        return info
+
 
 class Events(bootsteps.StartStopStep):
     requires = (Connection, )
@@ -477,6 +467,9 @@ class Tasks(bootsteps.StartStopStep):
             ignore_errors(c, c.task_consumer.close)
             c.task_consumer = None
 
+    def info(self, c):
+        return {'prefetch_count': c.qos.value}
+
 
 class Agent(bootsteps.StartStopStep):
     conditional = True

+ 1 - 9
celery/worker/control.py

@@ -177,15 +177,7 @@ def dump_active(panel, safe=False, **kwargs):
 
 @Panel.register
 def stats(panel, **kwargs):
-    asinfo = {}
-    if panel.consumer.controller.autoscaler:
-        asinfo = panel.consumer.controller.autoscaler.info()
-    return {'total': state.total_count,
-            'consumer': panel.consumer.info,
-            'pool': panel.consumer.pool.info,
-            'autoscaler': asinfo,
-            'pid': os.getpid(),
-            'clock': str(panel.app.clock)}
+    return panel.consumer.controller.stats()
 
 
 @Panel.register