Ver Fonte

Stresstest improvements

Ask Solem há 12 anos atrás
pai
commit
80597f7dd5
2 ficheiros alterados com 113 adições e 32 exclusões
  1. 2 2
      celery/concurrency/processes.py
  2. 111 30
      funtests/stress/stress.py

+ 2 - 2
celery/concurrency/processes.py

@@ -665,8 +665,8 @@ class TaskPool(BasePool):
         def on_not_recovering(proc):
             # XXX Theoretically a possibility, but maybe terminate the
             # process in this case to attempt to recover that way.
-            print('<<<<<<<<<< PROCESS IS NOT RECOVERING :(')
-            raise Exception('Contact support' % (proc, ))
+            raise Exception(
+                'Process writable but cannot write. Contact support!')
 
         def _write_job(fd, job, callback=None):
             # writes job to the worker process.

+ 111 - 30
funtests/stress/stress.py

@@ -1,16 +1,21 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
-import random
 import os
+import platform
+import random
 import signal
 import sys
 
 from time import time, sleep
 
-from celery import Celery, group
+from kombu.utils.compat import OrderedDict
+
+from celery import Celery, group, VERSION_BANNER
+from celery.bin.base import Command, Option
 from celery.exceptions import TimeoutError, SoftTimeLimitExceeded
-from celery.five import range
+from celery.five import range, values
 from celery.utils.debug import blockdetection
+from celery.utils.text import indent, pluralize
 
 # Should be run with workers running using these options:
 #
@@ -29,48 +34,60 @@ from celery.utils.debug import blockdetection
 BIG = 'x' * 2 ** 20 * 8
 SMALL = 'e' * 1024
 
-celery = Celery(
+BANNER = """\
+Celery stress-suite v{version}
+
+{platform}
+
+[config]
+.> broker: {conninfo}
+
+[toc: {total} {TESTS} total]
+{toc}
+"""
+
+app = Celery(
     'stress', broker='pyamqp://', backend='redis://',
     set_as_current=False,
 )
-celery.conf.update(
+app.conf.update(
     CELERYD_PREFETCH_MULTIPLIER=1,
 )
 
 
-@celery.task
+@app.task
 def _marker(s, sep='-'):
     print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
 
 
-@celery.task
+@app.task
 def add(x, y):
     return x + y
 
 
-@celery.task
+@app.task
 def any_(*args, **kwargs):
     wait = kwargs.get('sleep')
     if wait:
         sleep(wait)
 
 
-@celery.task
+@app.task
 def exiting(status=0):
     sys.exit(status)
 
 
-@celery.task
+@app.task
 def kill(sig=signal.SIGKILL):
     os.kill(os.getpid(), sig)
 
 
-@celery.task
+@app.task
 def sleeping(i):
     sleep(i)
 
 
-@celery.task
+@app.task
 def sleeping_ignore_limits(i):
     try:
         sleep(i)
@@ -78,7 +95,7 @@ def sleeping_ignore_limits(i):
         sleep(i)
 
 
-@celery.task
+@app.task
 def segfault():
     import ctypes
     ctypes.memset(0, 0, 1)
@@ -90,36 +107,97 @@ def marker(s, sep='-'):
     _marker.delay(s, sep)
 
 
-class Stresstests(object):
+class Stress(Command):
+
+    def run(self, *names, **options):
+        try:
+            return Suite(self.app,
+                         block_timeout=options.get('block_timeout'),
+            ).run(names, **options)
+        except KeyboardInterrupt:
+            pass
+
+    def get_options(self):
+        return (
+            Option('-i', '--iterations', type='int', default=50,
+                   help='Number of iterations for each test'),
+            Option('-n', '--numtests', type='int', default=None,
+                   help='Number of tests to execute'),
+            Option('-o', '--offset', type='int', default=0,
+                   help='Start at custom offset'),
+            Option('--block-timeout', type='int', default=30 * 60),
+            Option('-l', '--list', action='store_true', dest='list_all',
+                   help='List all tests'),
+        )
+
+
+class Suite(object):
 
     def __init__(self, app, block_timeout=30 * 60):
         self.app = app
         self.connerrors = self.app.connection().recoverable_connection_errors
         self.block_timeout = block_timeout
 
-    def run(self, n=50):
+        self.tests = OrderedDict(
+            (fun.__name__, fun) for fun in [
+                self.manyshort,
+                self.termbysig,
+                self.bigtasks,
+                self.smalltasks,
+                self.timelimits,
+                self.timelimits_soft,
+                self.revoketermfast,
+                self.revoketermslow,
+                self.alwayskilled,
+            ]
+        )
+
+    def run(self, names=None, iterations=50, offset=0,
+            numtests=None, list_all=False, **kw):
+        tests = self.filtertests(names)[offset:numtests or None]
+        if list_all:
+            return print(self.testlist(tests))
+        print(self.banner(tests))
         marker('Stresstest suite start', '+')
-        tests = [self.manyshort,
-                 self.termbysig,
-                 self.bigtasks,
-                 self.smalltasks,
-                 self.timelimits,
-                 self.timelimits_soft,
-                 self.revoketermfast,
-                 self.revoketermslow]
-        for test in tests:
-            self.runtest(test, n)
+        for i, test in enumerate(tests):
+            self.runtest(test, iterations, i + 1)
         marker('Stresstest suite end', '+')
 
+
+    def filtertests(self, names):
+        try:
+            return ([self.tests[n] for n in names] if names
+                    else list(values(self.tests)))
+        except KeyError as exc:
+            raise KeyError('Unknown test name: {0}'.format(exc))
+
+    def testlist(self, tests):
+        return ',\n'.join(
+            '.> {0}) {1}'.format(i + 1, t.__name__)
+            for i, t in enumerate(tests)
+        )
+
+    def banner(self, tests):
+        app = self.app
+        return BANNER.format(
+            app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
+            version=VERSION_BANNER,
+            conninfo=app.connection().as_uri(),
+            platform=platform.platform(),
+            toc=self.testlist(tests),
+            TESTS=pluralize(len(tests), 'test'),
+            total=len(tests),
+        )
+
     def manyshort(self):
         self.join(group(add.s(i, i) for i in xrange(1000))())
 
-    def runtest(self, fun, n=50):
+    def runtest(self, fun, n=50, index=0):
         with blockdetection(self.block_timeout):
             t = time()
             i = 0
             failed = False
-            marker('{0}({1})'.format(fun.__name__, n))
+            marker('{0}: {1}({2})'.format(index, fun.__name__, n))
             try:
                 for i in range(n):
                     print(i)
@@ -146,6 +224,10 @@ class Stresstests(object):
         self._evil_groupmember(sleeping_ignore_limits, 2,
                                soft_timeout=1, timeout=1.1)
 
+    def alwayskilled(self):
+        g = group(kill.s() for _ in range(10))
+        self.join(g(), timeout=10)
+
     def _evil_groupmember(self, evil_t, *eargs, **opts):
         g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
                    add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
@@ -187,5 +269,4 @@ class Stresstests(object):
 
 
 if __name__ == '__main__':
-    s = Stresstests(celery)
-    s.run()
+    Stress(app=app).execute_from_commandline()