Browse Source

Removed all traces of the pool diagnose command

Ask Solem 14 năm trước cách đây
mục cha
commit
557e8e4d92

+ 0 - 26
celery/concurrency/processes/__init__.py

@@ -76,32 +76,6 @@ class TaskPool(object):
             self._pool.terminate()
             self._pool = None
 
-    def diagnose(self, timeout=None):
-        pids = set(worker.pid for worker in self._pool._pool)
-        seen = set()
-        results = {}
-        time_start = time()
-
-        def callback(i):
-            for pid in results[i].worker_pids():
-                seen.add(pid)
-
-        i = 0
-        while pids ^ seen:
-            print("%r > %r" % (time() - time_start, timeout))
-            if timeout and time() - time_start > timeout:
-                print("TIMED OUT i==%r" % (i, ))
-                break
-            results[i] = self._pool.apply_async(pingback,
-                                                args=(i, ),
-                                                callback=callback)
-            sleep(0.1)
-            i += 1
-
-        return {"active": list(seen),
-                "waiting": list(pids ^ seen),
-                "iterations": i}
-
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, accept_callback=None, timeout_callback=None,
             **compat):

+ 0 - 10
celery/tests/test_concurrency_processes.py

@@ -176,13 +176,3 @@ class test_TaskPool(unittest.TestCase):
         info = pool.info
         self.assertEqual(info["max-concurrency"], pool.limit)
         self.assertEqual(len(info["processes"]), pool.limit)
-
-    @skip_if_quick
-    def test_diagnose(self):
-        pool = ExeMockTaskPool(10)
-        pool.start()
-
-        r = pool.diagnose(timeout=None)
-        self.assertEqual(len(r["active"]), pool.limit)
-        self.assertFalse(r["waiting"])
-        self.assertTrue(r["iterations"])

+ 0 - 12
celery/worker/control/builtins.py

@@ -9,18 +9,6 @@ from celery.worker.control.registry import Panel
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 
 
-@Panel.register
-def diagnose(panel, timeout=None, **kwargs):
-    info = panel.listener.pool.diagnose(timeout=timeout)
-
-    if info["waiting"]:
-        panel.logger.error("Diagnose failed: %r" % (info, ))
-        return {"error": info}
-
-    panel.logger.info("Diagnose complete: %r" % (info, ))
-    return {"ok": info}
-
-
 @Panel.register
 def revoke(panel, task_id, **kwargs):
     """Revoke task by task id."""