|
@@ -1,8 +1,9 @@
|
|
|
-from __future__ import absolute_import, print_function
|
|
|
+from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
|
|
import platform
|
|
|
import random
|
|
|
|
|
|
+from collections import namedtuple
|
|
|
from itertools import count
|
|
|
from time import time, sleep
|
|
|
|
|
@@ -16,7 +17,7 @@ from celery.utils.text import pluralize
|
|
|
from celery.utils.timeutils import humanize_seconds
|
|
|
|
|
|
from .app import (
|
|
|
- marker, add, any_, kill, sleeping,
|
|
|
+ marker, _marker, add, any_, kill, sleeping,
|
|
|
sleeping_ignore_limits, segfault,
|
|
|
)
|
|
|
from .data import BIG, SMALL
|
|
@@ -33,6 +34,21 @@ Celery stress-suite v{version}
|
|
|
{toc}
|
|
|
"""
|
|
|
|
|
|
+F_PROGRESS = """\
|
|
|
+{0.index}: {0.test.__name__}({0.iteration}/{0.total_iterations}) \
|
|
|
+rep#{0.repeats} elapsed:{since} \
|
|
|
+"""
|
|
|
+
|
|
|
+Progress = namedtuple('Progress', (
|
|
|
+ 'test', 'iteration', 'total_iterations',
|
|
|
+ 'index', 'repeats', 'when', 'completed',
|
|
|
+))
|
|
|
+
|
|
|
+
|
|
|
+def pstatus(p):
|
|
|
+ return F_PROGRESS.format(
|
|
|
+ p, since=humanize_seconds(time() - p.when, now='0 seconds'))
|
|
|
+
|
|
|
|
|
|
class Suite(object):
|
|
|
|
|
@@ -40,6 +56,7 @@ class Suite(object):
|
|
|
self.app = app
|
|
|
self.connerrors = self.app.connection().recoverable_connection_errors
|
|
|
self.block_timeout = block_timeout
|
|
|
+ self.progress = None
|
|
|
|
|
|
self.tests = OrderedDict(
|
|
|
(fun.__name__, fun) for fun in [
|
|
@@ -68,7 +85,7 @@ class Suite(object):
|
|
|
'+',
|
|
|
)
|
|
|
for j, test in enumerate(tests):
|
|
|
- self.runtest(test, iterations, j + 1)
|
|
|
+ self.runtest(test, iterations, j + 1, i + 1)
|
|
|
marker(
|
|
|
'Stresstest suite end (repetition {0})'.format(i + 1),
|
|
|
'+',
|
|
@@ -102,20 +119,22 @@ class Suite(object):
|
|
|
def manyshort(self):
|
|
|
self.join(group(add.s(i, i) for i in xrange(1000))(), propagate=True)
|
|
|
|
|
|
- def runtest(self, fun, n=50, index=0):
|
|
|
+ def runtest(self, fun, n=50, index=0, repeats=1):
|
|
|
with blockdetection(self.block_timeout):
|
|
|
t = time()
|
|
|
i = 0
|
|
|
failed = False
|
|
|
- marker('{0}: {1}({2})'.format(index, fun.__name__, n))
|
|
|
+ self.progress = Progress(fun, i, n, index, repeats, t, 0)
|
|
|
+ _marker.delay(pstatus(self.progress))
|
|
|
try:
|
|
|
for i in range(n):
|
|
|
- print('{0} ({1})'.format(i, fun.__name__), end=' ')
|
|
|
+ self.progress = Progress(fun, i, n, index, repeats, t, 0)
|
|
|
+ print(pstatus(self.progress), end=' ')
|
|
|
try:
|
|
|
fun()
|
|
|
print('-> done')
|
|
|
except Exception as exc:
|
|
|
- print('-> {}'.format(exc))
|
|
|
+ print('-> {0!r}'.format(exc))
|
|
|
except Exception:
|
|
|
failed = True
|
|
|
raise
|
|
@@ -124,6 +143,8 @@ class Suite(object):
|
|
|
'failed after' if failed else 'completed',
|
|
|
i + 1, humanize_seconds(time() - t),
|
|
|
))
|
|
|
+ if not failed:
|
|
|
+ self.progress = Progress(fun, i, n, index, repeats, t, 1)
|
|
|
|
|
|
def termbysig(self):
|
|
|
self._evil_groupmember(kill)
|
|
@@ -173,7 +194,7 @@ class Suite(object):
|
|
|
self.join(r, timeout=5)
|
|
|
|
|
|
def missing_results(self, r):
|
|
|
- return [result.id for result in r if result.id not in result.backend._cache]
|
|
|
+ return [res.id for res in r if res.id not in res.backend._cache]
|
|
|
|
|
|
def join(self, r, propagate=False, **kwargs):
|
|
|
while 1:
|
|
@@ -185,3 +206,6 @@ class Suite(object):
|
|
|
self.missing_results(r), exc), '!')
|
|
|
except self.connerrors as exc:
|
|
|
marker('join: connection lost: {0!r}'.format(exc), '!')
|
|
|
+
|
|
|
+ def dump_progress(self):
|
|
|
+ return pstatus(self.progress) if self.progress else 'No test running'
|