Parcourir la source

Show listener info in the 'stats' control command

Ask Solem il y a 14 ans
Parent
commit
a4ccfc2410
3 fichiers modifiés avec 17 ajouts et 2 suppressions
  1. 3 2
      celery/app/amqp.py
  2. 1 0
      celery/worker/control/builtins.py
  3. 13 0
      celery/worker/listener.py

+ 3 - 2
celery/app/amqp.py

@@ -230,8 +230,9 @@ class AMQP(object):
         q = self.app.conf.CELERY_DEFAULT_QUEUE
         return q, self.queues[q]
 
-    def get_broker_info(self):
-        broker_connection = self.app.broker_connection()
+    def get_broker_info(self, broker_connection=None):
+        if broker_connection is None:
+            broker_connection = self.app.broker_connection()
         carrot_backend = broker_connection.backend_cls
         if carrot_backend and not isinstance(carrot_backend, str):
             carrot_backend = carrot_backend.__name__

+ 1 - 0
celery/worker/control/builtins.py

@@ -151,6 +151,7 @@ def dump_active(panel, safe=False, **kwargs):
 @Panel.register
 def stats(panel, **kwargs):
     return {"total": state.total_count,
+            "listener": panel.listener.info,
             "pool": panel.listener.pool.info}
 
 

+ 13 - 0
celery/worker/listener.py

@@ -90,6 +90,11 @@ from celery.utils import noop, retry_over_time
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
+from celery.events import EventDispatcher
+from celery.messaging import establish_connection
+from celery.messaging import get_consumer_set, BroadcastConsumer
+from celery.exceptions import NotRegistered
+from celery.datastructures import SharedCounter
 
 RUN = 0x1
 CLOSE = 0x2
@@ -460,3 +465,11 @@ class CarrotListener(object):
         """
         self.logger.debug("CarrotListener: Stopping consumers...")
         self.stop_consumers(close=False)
+
+    @property
+    def info(self):
+        conninfo = {}
+        if self.connection:
+            conninfo = self.app.amqp.get_broker_info(self.connection)
+        return {"broker": conninfo,
+                "prefetch_count": self.qos.next}