Browse Source

Reorganize stresstests

Ask Solem 12 years ago
parent
commit
695decc332

+ 5 - 5
funtests/stress/README.rst

@@ -71,18 +71,18 @@ Running the client
 After the worker is running you can start the client to run the complete test
 suite::
 
-    $ python stress.py
+    $ python -m stress
 
 You can also specify which tests to run:
 
-    $ python stress.py revoketermfast revoketermslow
+    $ python -m stress revoketermfast revoketermslow
 
 Or you can start from an offset, e.g. to skip the two first tests use
 ``--offset=2``::
 
-    $ python stress.py --offset=2
+    $ python -m stress --offset=2
 
-See ``python stress.py --help`` for a list of all available options.
+See ``python -m stress --help`` for a list of all available options.
 
 
 Options
@@ -95,7 +95,7 @@ You can set the environment variable ``CSTRESS_BACKEND`` to change
 the result backend used::
 
     $ CSTRESS_BACKEND='amqp://' celery -A stress worker #...
-    $ CSTRESS_BACKEND='amqp://' python stress.py
+    $ CSTRESS_BACKEND='amqp://' python -m stress
 
 Using a custom queue
 --------------------

+ 4 - 0
funtests/stress/stress/__init__.py

@@ -0,0 +1,4 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+from .app import app

+ 37 - 0
funtests/stress/stress/__main__.py

@@ -0,0 +1,37 @@
+from __future__ import absolute_import, print_function
+
+from celery.bin.base import Command, Option
+
+from .app import app
+from .suite import Suite
+
+
+class Stress(Command):
+
+    def run(self, *names, **options):
+        try:
+            return Suite(
+                self.app,
+                block_timeout=options.get('block_timeout'),
+            ).run(names, **options)
+        except KeyboardInterrupt:
+            pass
+
+    def get_options(self):
+        return (
+            Option('-i', '--iterations', type='int', default=50,
+                   help='Number of iterations for each test'),
+            Option('-n', '--numtests', type='int', default=None,
+                   help='Number of tests to execute'),
+            Option('-o', '--offset', type='int', default=0,
+                   help='Start at custom offset'),
+            Option('--block-timeout', type='int', default=30 * 60),
+            Option('-l', '--list', action='store_true', dest='list_all',
+                   help='List all tests'),
+            Option('-r', '--repeat', type='float', default=0,
+                   help='Number of times to repeat the test suite'),
+        )
+
+
+if __name__ == '__main__':
+    Stress(app=app).execute_from_commandline()

+ 86 - 0
funtests/stress/stress/app.py

@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, print_function
+
+import os
+import sys
+import signal
+
+from kombu import Exchange, Queue
+from time import sleep
+
+from celery import Celery
+from celery.exceptions import SoftTimeLimitExceeded
+
+CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
+CSTRESS_BACKEND = os.environ.get('CSTRESS_BACKEND', 'redis://')
+
+app = Celery(
+    'stress', broker='amqp://', backend=CSTRESS_BACKEND,
+    set_as_current=False,
+)
+app.conf.update(
+    CELERYD_PREFETCH_MULTIPLIER=10,
+    CELERY_DEFAULT_QUEUE=CSTRESS_QUEUE,
+    CELERY_QUEUES=(
+        Queue(CSTRESS_QUEUE,
+              exchange=Exchange(CSTRESS_QUEUE, durable=False),
+              routing_key=CSTRESS_QUEUE,
+              durable=False, auto_delete=True),
+    ),
+)
+
+
+@app.task
+def _marker(s, sep='-'):
+    print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
+
+
+@app.task
+def add(x, y):
+    return x + y
+
+
+@app.task
+def any_(*args, **kwargs):
+    wait = kwargs.get('sleep')
+    if wait:
+        sleep(wait)
+
+
+@app.task
+def exiting(status=0):
+    sys.exit(status)
+
+
+@app.task
+def kill(sig=signal.SIGKILL):
+    os.kill(os.getpid(), sig)
+
+
+@app.task
+def sleeping(i):
+    sleep(i)
+
+
+@app.task
+def sleeping_ignore_limits(i):
+    try:
+        sleep(i)
+    except SoftTimeLimitExceeded:
+        sleep(i)
+
+
+@app.task
+def segfault():
+    import ctypes
+    ctypes.memset(0, 0, 1)
+    assert False, 'should not get here'
+
+
+def marker(s, sep='-'):
+    print('{0}{1}'.format(sep, s))
+    while True:
+        try:
+            return _marker.delay(s, sep)
+        except Exception as exc:
+            print("Retrying marker.delay(). It failed to start: %s" % exc)

+ 23 - 0
funtests/stress/stress/data.py

@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+from celery.utils.debug import humanbytes
+
+
+class Data(object):
+
+    def __init__(self, label, data):
+        self.label = label
+        self.data = data
+
+    def __str__(self):
+        return '<Data: {0} {1}>'.format(
+            self.label, humanbytes(len(self.data)),
+        )
+
+    def __reduce__(self):
+        return Data, (self.label, self.data)
+    __unicode__ = __repr__ = __str__
+
+BIG = Data("BIG", 'x' * 2 ** 20 * 8)
+SMALL = Data("SMALL", 'e' * 1024)

+ 8 - 142
funtests/stress/stress.py → funtests/stress/stress/suite.py

@@ -1,54 +1,25 @@
-#!/usr/bin/env python
 from __future__ import absolute_import, print_function
 
-import os
 import platform
 import random
-import signal
-import sys
 
 from itertools import count
 from time import time, sleep
 
-from kombu import Exchange, Queue
 from kombu.utils.compat import OrderedDict
 
-from celery import Celery, group, VERSION_BANNER
-from celery.bin.base import Command, Option
-from celery.exceptions import TimeoutError, SoftTimeLimitExceeded
+from celery import group, VERSION_BANNER
+from celery.exceptions import TimeoutError
 from celery.five import range, values
-from celery.utils.debug import blockdetection, humanbytes
+from celery.utils.debug import blockdetection
 from celery.utils.text import pluralize
 from celery.utils.timeutils import humanize_seconds
 
-# Should be run with workers running using these options:
-#
-#  1) celery -A stress worker -c 1 --maxtasksperchild=1
-#  2) celery -A stress worker -c 8 --maxtasksperchild=1
-#
-#  3) celery -A stress worker -c 1
-#  4) celery -A stress worker -c 8
-#
-#  5) celery -A stress worker --autoscale=8,0
-#
-#  6) celery -A stress worker --time-limit=1
-#
-#  7) celery -A stress worker -c1 --maxtasksperchild=1 -- celery.acks_late=1
-
-class Data(object):
-
-    def __init__(self, label, data):
-        self.label = label
-        self.data = data
-
-    def __str__(self):
-        return '<Data: {0} {1}>'.format(
-            self.label, humanbytes(len(self.data)),
-        )
-    __unicode__ = __repr__ = __str__
-
-BIG = Data("BIG", 'x' * 2 ** 20 * 8)
-SMALL = Data("SMALL", 'e' * 1024)
+from .app import (
+    marker, add, any_, kill, sleeping,
+    sleeping_ignore_limits, segfault,
+)
+from .data import BIG, SMALL
 
 BANNER = """\
 Celery stress-suite v{version}
@@ -62,107 +33,6 @@ Celery stress-suite v{version}
 {toc}
 """
 
-CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
-CSTRESS_BACKEND = os.environ.get('CSTRESS_BACKEND', 'redis://')
-
-app = Celery(
-    'stress', broker='amqp://', backend=CSTRESS_BACKEND,
-    set_as_current=False,
-)
-app.conf.update(
-    CELERYD_PREFETCH_MULTIPLIER=10,
-    CELERY_DEFAULT_QUEUE=CSTRESS_QUEUE,
-    CELERY_QUEUES=(
-        Queue(CSTRESS_QUEUE,
-              exchange=Exchange(CSTRESS_QUEUE, durable=False),
-              routing_key=CSTRESS_QUEUE,
-              durable=False, auto_delete=True),
-    ),
-)
-
-
-@app.task
-def _marker(s, sep='-'):
-    print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
-
-
-@app.task
-def add(x, y):
-    return x + y
-
-
-@app.task
-def any_(*args, **kwargs):
-    wait = kwargs.get('sleep')
-    if wait:
-        sleep(wait)
-
-
-@app.task
-def exiting(status=0):
-    sys.exit(status)
-
-
-@app.task
-def kill(sig=signal.SIGKILL):
-    os.kill(os.getpid(), sig)
-
-
-@app.task
-def sleeping(i):
-    sleep(i)
-
-
-@app.task
-def sleeping_ignore_limits(i):
-    try:
-        sleep(i)
-    except SoftTimeLimitExceeded:
-        sleep(i)
-
-
-@app.task
-def segfault():
-    import ctypes
-    ctypes.memset(0, 0, 1)
-    assert False, 'should not get here'
-
-
-def marker(s, sep='-'):
-    print('{0}{1}'.format(sep, s))
-    while True:
-        try:
-            return _marker.delay(s, sep)
-        except Exception as exc:
-            print("Retrying marker.delay(). It failed to start: %s" % exc)
-
-
-class Stress(Command):
-
-    def run(self, *names, **options):
-        try:
-            return Suite(
-                self.app,
-                block_timeout=options.get('block_timeout'),
-            ).run(names, **options)
-        except KeyboardInterrupt:
-            pass
-
-    def get_options(self):
-        return (
-            Option('-i', '--iterations', type='int', default=50,
-                   help='Number of iterations for each test'),
-            Option('-n', '--numtests', type='int', default=None,
-                   help='Number of tests to execute'),
-            Option('-o', '--offset', type='int', default=0,
-                   help='Start at custom offset'),
-            Option('--block-timeout', type='int', default=30 * 60),
-            Option('-l', '--list', action='store_true', dest='list_all',
-                   help='List all tests'),
-            Option('-r', '--repeat', type='float', default=0,
-                   help='Number of times to repeat the test suite'),
-        )
-
 
 class Suite(object):
 
@@ -310,7 +180,3 @@ class Suite(object):
                 marker('join timed out: {0!r}'.format(exc), '!')
             except self.connerrors as exc:
                 marker('join: connection lost: {0!r}'.format(exc), '!')
-
-
-if __name__ == '__main__':
-    Stress(app=app).execute_from_commandline()