Bladeren bron

New remote control command: Diagnose. Verify that the pool workers are able to accept and perform tasks.

Ask Solem 14 jaren geleden
bovenliggende
commit
8891626c9d
2 gewijzigde bestanden met toevoegingen van 17 en 0 verwijderingen
  1. 5 0
      celery/task/control.py
  2. 12 0
      celery/worker/control/builtins.py

+ 5 - 0
celery/task/control.py

@@ -112,6 +112,7 @@ class inspect(object):
 
     def _request(self, command, **kwargs):
         return self._prepare(broadcast(command, arguments=kwargs,
+                                      destination=self.destination,
                                       timeout=self.timeout, reply=True))
 
     def active(self, safe=False):
@@ -138,6 +139,10 @@ class inspect(object):
     def disable_events(self):
         return self._request("disable_events")
 
+    def diagnose(self):
+        diagnose_timeout = self.timeout * 0.85 # 15% of timeout
+        return self._request("diagnose", timeout=diagnose_timeout)
+
 
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,

+ 12 - 0
celery/worker/control/builtins.py

@@ -12,6 +12,18 @@ from celery.worker.control.registry import Panel
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 
 
+@Panel.register
+def diagnose(panel, timeout=None, **kwargs):
+    info = panel.listener.pool.diagnose(timeout=timeout)
+
+    if info["waiting"]:
+        panel.logger.error("Diagnose failed: %r" % (info, ))
+        return {"error": info}
+
+    panel.logger.info("Diagnose complete: %r" % (info, ))
+    return {"ok": info}
+
+
 @Panel.register
 def revoke(panel, task_id, task_name=None, **kwargs):
     """Revoke task by task id."""