Pārlūkot izejas kodu

Test for not filling multiproc write buffer

Ask Solem 11 gadi atpakaļ
vecāks
revīzija
b1d433981e
1 mainītis faili ar 24 papildinājumiem un 0 dzēšanām
  1. 24 0
      funtests/stress/testbuf.py

+ 24 - 0
funtests/stress/testbuf.py

@@ -0,0 +1,24 @@
+from __future__ import absolute_import
+
+import sys
+import time
+
+from celery.result import ResultSet
+
+from stress.app import app, sleeping
+
+
+
+def testbuf(padbytes=0, megabytes=0):
+    padding = float(padbytes) + 2 ** 20 * float(megabytes)
+    results = []
+    print('> padding: %r' % (padding, ))
+
+    for i in range(8 * 4):
+        results.append(sleeping.delay(1, kw='x' * int(padding)))
+        time.sleep(0.01)
+
+    res = ResultSet(results)
+    print(res.join())
+
+testbuf(*sys.argv[1:])