Przeglądaj źródła

Stresstests: Improvements

Ask Solem 11 lat temu
rodzic
commit
c7c2e6ad98
2 zmienionych plików z 46 dodań i 20 usunięć
  1. 6 1
      funtests/stress/stress/app.py
  2. 40 19
      funtests/stress/stress/suite.py

+ 6 - 1
funtests/stress/stress/app.py

@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
 
 import os
 import sys
@@ -106,6 +106,11 @@ def retries(self):
     return 10
 
 
+@app.task
+def unicode():
+    print('hiöäüß')
+
+
 @app.task
 def segfault():
     import ctypes

+ 40 - 19
funtests/stress/stress/suite.py

@@ -6,13 +6,13 @@ import socket
 
 from collections import namedtuple
 from itertools import count
-from time import time, sleep
+from time import sleep
 
 from kombu.utils.compat import OrderedDict
 
 from celery import group, VERSION_BANNER
 from celery.exceptions import TimeoutError
-from celery.five import range, values
+from celery.five import range, values, monotonic
 from celery.utils.debug import blockdetection
 from celery.utils.text import pluralize
 from celery.utils.timeutils import humanize_seconds
@@ -37,18 +37,25 @@ Celery stress-suite v{version}
 
 F_PROGRESS = """\
 {0.index}: {0.test.__name__}({0.iteration}/{0.total_iterations}) \
-rep#{0.repeats} elapsed:{since} \
+rep#{0.repeats} runtime: {runtime}/{elapsed} \
 """
 
 Progress = namedtuple('Progress', (
     'test', 'iteration', 'total_iterations',
-    'index', 'repeats', 'when', 'completed',
+    'index', 'repeats', 'runtime', 'elapsed', 'completed',
 ))
 
 
+class StopSuite(Exception):
+    pass
+
+
 def pstatus(p):
     return F_PROGRESS.format(
-        p, since=humanize_seconds(time() - p.when, now='0 seconds'))
+        p,
+        runtime=humanize_seconds(monotonic() - p.runtime, now='0 seconds'),
+        elapsed=humanize_seconds(monotonic() - p.elapsed, now='0 seconds'),
+    )
 
 
 def testgroup(*funs):
@@ -134,31 +141,42 @@ class Suite(object):
         self.join(group(add.s(i, i) for i in range(1000))(), timeout=10, propagate=True)
 
     def runtest(self, fun, n=50, index=0, repeats=1):
+        print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
         with blockdetection(self.block_timeout):
-            t = time()
+            runtime = elapsed = monotonic()
             i = 0
             failed = False
-            self.progress = Progress(fun, i, n, index, repeats, t, 0)
+            self.progress = Progress(
+                fun, i, n, index, repeats, elapsed, runtime, 0,
+            )
             _marker.delay(pstatus(self.progress))
             try:
                 for i in range(n):
-                    self.progress = Progress(fun, i, n, index, repeats, t, 0)
-                    print(pstatus(self.progress), end=' ')
+                    runtime = monotonic()
+                    self.progress = Progress(
+                        fun, i + 1, n, index, repeats, runtime, elapsed, 0,
+                    )
                     try:
                         fun()
-                        print('-> done')
+                    except StopSuite:
+                        raise
                     except Exception as exc:
                         print('-> {0!r}'.format(exc))
+                        print(pstatus(self.progress))
+                    else:
+                        print(pstatus(self.progress))
             except Exception:
                 failed = True
                 raise
             finally:
                 print('{0} {1} iterations in {2}s'.format(
                     'failed after' if failed else 'completed',
-                    i + 1, humanize_seconds(time() - t),
+                    i + 1, humanize_seconds(monotonic() - elapsed),
                 ))
                 if not failed:
-                    self.progress = Progress(fun, i, n, index, repeats, t, 1)
+                    self.progress = Progress(
+                        fun, i + 1, n, index, repeats, runtime, elapsed, 1,
+                    )
 
     def termbysig(self):
         self._evil_groupmember(kill)
@@ -217,22 +235,25 @@ class Suite(object):
     def missing_results(self, r):
         return [res.id for res in r if res.id not in res.backend._cache]
 
-    def join(self, r, propagate=False, **kwargs):
-        received = [0]
+    def join(self, r, propagate=False, max_retries=1, **kwargs):
+        received = []
 
         def on_result(task_id, value):
-            received[0] += 1
+            received.append(task_id)
 
-        while 1:
+        for i in range(max_retries) if max_retries else count(0):
+            received[:] = []
             try:
                 return r.get(callback=on_result, propagate=propagate, **kwargs)
             except (socket.timeout, TimeoutError) as exc:
-                print('RECEIVED {0}/{0}'.format(received[0], len(r)))
+                waiting_for = self.missing_results(r)
                 marker(
-                    'Still waiting for {0!r}: {1!r}'.format(
-                        self.missing_results(r), exc), '!')
+                    'Still waiting for (0) [{1}]: {2!r}'.format(
+                        len(waiting_for), ','.join(waiting_for), exc), '!',
+                )
             except self.connerrors as exc:
                 marker('join: connection lost: {0!r}'.format(exc), '!')
+        raise StopSuite('Test failed: Missing task results')
 
     def dump_progress(self):
         return pstatus(self.progress) if self.progress else 'No test running'