Explorar o código

TaskPool.diagnose: Verify that all pool workers are still functional.

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
b0a7bf19f2
Modificáronse 1 ficheiros con 29 adicións e 0 borrados
  1. 29 0
      celery/concurrency/processes/__init__.py

+ 29 - 0
celery/concurrency/processes/__init__.py

@@ -3,6 +3,7 @@
 Process Pools.
 
 """
+from time import sleep, time
 
 from celery import log
 from celery.datastructures import ExceptionInfo
@@ -11,6 +12,10 @@ from celery.utils.functional import curry
 from celery.concurrency.processes.pool import Pool, RUN
 
 
+def pingback(i):
+    return i
+
+
 class TaskPool(object):
     """Process Pool for processing tasks in parallel.
 
@@ -66,6 +71,30 @@ class TaskPool(object):
             self._pool.terminate()
             self._pool = None
 
+    def diagnose(self, timeout=None):
+        pids = set(worker.pid for worker in self._pool._pool)
+        seen = set()
+        results = {}
+        time_start = time()
+
+        def callback(i):
+            for pid in results[i].worker_pids():
+                seen.add(pid)
+
+        i = 0
+        while pids ^ seen:
+            if time() - time_start > timeout:
+                break
+            results[i] = self._pool.apply_async(pingback,
+                                                args=(i, ),
+                                                callback=callback)
+            sleep(0.1)
+            i += 1
+
+        return {"active": list(seen),
+                "waiting": list(pids ^ seen),
+                "iterations": i}
+
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, accept_callback=None, timeout_callback=None,
             **compat):