Browse Source

celery.task.control.inspect: Inspect a running worker.

Examples:

    # Inspect a single worker
    >>> i = inspect("myworker.example.com")

    # Inspect several workers
    >>> i = inspect(["myworker.example.com", "myworker2.example.com"])

    # Inspect all workers consuming on this vhost.
    >>> i = inspect()

    ### Methods

    # Get currently executing tasks
    >>> i.active()

    # Get currently reserved tasks
    >>> i.reserved()

    # Get the current eta schedule
    >>> i.scheduled()

    # Worker statistics and info
    >>> i.stats()

    # List of currently revoked tasks
    >>> i.revoked()

    # List of registered tasks
    >>> i.registered_tasks()
Ask Solem 14 years ago
parent
commit
bda9e5f74f
1 changed files with 51 additions and 0 deletions
  1. 51 0
      celery/task/control.py

+ 51 - 0
celery/task/control.py

@@ -88,6 +88,57 @@ def rate_limit(task_name, rate_limit, destination=None, **kwargs):
                                    **kwargs)
 
 
+def flatten_reply(reply):
+    nodes = {}
+    for item in reply:
+        nodes.update(item)
+    return nodes
+
+
+class inspect(object):
+
+    def __init__(self, destination=None, timeout=1):
+        self.destination = destination
+        self.timeout = timeout
+
+    def _prepare(self, reply):
+        if not reply:
+            return
+        by_node = flatten_reply(reply)
+        if not isinstance(self.destination, (list, tuple)):
+            return by_node.get(self.destination)
+        return by_node
+
+    def _request(self, command, **kwargs):
+        return self._prepare(broadcast(command, arguments=kwargs,
+                                      timeout=self.timeout, reply=True))
+
+    def active(self, safe=False):
+        return self._request("dump_active", safe=safe)
+
+    def scheduled(self, safe=False):
+        return self._request("dump_schedule", safe=safe)
+
+    def reserved(self, safe=False):
+        return self._request("dump_reserved", safe=safe)
+
+    def stats(self):
+        return self._request("stats")
+
+    def revoked(self):
+        return self._request("dump_revoked")
+
+    def registered_tasks(self):
+        return self._request("dump_registered_tasks")
+
+    def enable_events(self):
+        return self._request("enable_events")
+
+    def disable_events(self):
+        return self._request("disable_events")
+
+
+
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,
         connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,