소스 검색

Stresstests: Refactors Suite into BaseSuite and Suite

Ask Solem 11 년 전
부모
커밋
59ece7d5f2
1개의 변경된 파일98개의 추가작업 그리고 66개의 파일을 삭제
  1. 98 66
      funtests/stress/stress/suite.py

+ 98 - 66
funtests/stress/stress/suite.py

@@ -1,11 +1,12 @@
 from __future__ import absolute_import, print_function, unicode_literals
 
+import inspect
 import platform
 import random
 import socket
 import sys
 
-from collections import namedtuple
+from collections import defaultdict, namedtuple
 from itertools import count
 from time import sleep
 
@@ -13,14 +14,14 @@ from kombu.utils.compat import OrderedDict
 
 from celery import group, VERSION_BANNER
 from celery.exceptions import TimeoutError
-from celery.five import range, values, monotonic
+from celery.five import items, monotonic, range, values
 from celery.utils.debug import blockdetection
 from celery.utils.text import pluralize, truncate
 from celery.utils.timeutils import humanize_seconds
 
 from .app import (
     marker, _marker, add, any_, exiting, kill, sleeping,
-    sleeping_ignore_limits, segfault, any_returning,
+    sleeping_ignore_limits, any_returning,
 )
 from .data import BIG, SMALL
 from .fbi import FBI
@@ -83,7 +84,7 @@ def testgroup(*funs):
     return OrderedDict((fun.__name__, fun) for fun in funs)
 
 
-class Suite(object):
+class BaseSuite(object):
 
     def __init__(self, app, block_timeout=30 * 60):
         self.app = app
@@ -92,30 +93,26 @@ class Suite(object):
         self.progress = None
         self.speaker = Speaker()
         self.fbi = FBI(app)
+        self.init_groups()
 
-        self.groups = {
-            'all': testgroup(
-                self.manyshort,
-                self.termbysig,
-                self.bigtasks,
-                self.bigtasksbigvalue,
-                self.smalltasks,
-                self.timelimits,
-                self.timelimits_soft,
-                self.revoketermfast,
-                self.revoketermslow,
-                self.alwayskilled,
-                self.alwaysexits,
-            ),
-            'green': testgroup(
-                self.manyshort,
-                self.bigtasks,
-                self.bigtasksbigvalue,
-                self.smalltasks,
-                self.alwaysexits,
-                self.group_with_exit,
-            ),
-        }
+    def init_groups(self):
+        acc = defaultdict(list)
+        for attr in dir(self):
+            if not _is_descriptor(self, attr):
+                meth = getattr(self, attr)
+                try:
+                    groups = meth.__func__.__testgroup__
+                except AttributeError:
+                    pass
+                else:
+                    for group in groups:
+                        acc[group].append(meth)
+        # sort the tests by the order in which they are defined in the class
+        for group in values(acc):
+            group[:] = sorted(group, key=lambda m: m.__func__.__testsort__)
+        self.groups = dict(
+            (name, testgroup(*tests)) for name, tests in items(acc)
+        )
 
     def run(self, names=None, iterations=50, offset=0,
             numtests=None, list_all=False, repeat=0, group='all',
@@ -167,10 +164,6 @@ class Suite(object):
             total=len(tests),
         )
 
-    def manyshort(self):
-        self.join(group(add.s(i, i) for i in range(1000))(),
-                  timeout=10, propagate=True)
-
     def runtest(self, fun, n=50, index=0, repeats=1):
         print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
         with blockdetection(self.block_timeout):
@@ -211,26 +204,92 @@ class Suite(object):
                             fun, i + 1, n, index, repeats, runtime, elapsed, 1,
                         )
 
+    def missing_results(self, r):
+        return [res.id for res in r if res.id not in res.backend._cache]
+
+    def join(self, r, propagate=False, max_retries=10, **kwargs):
+        if self.no_join:
+            return
+        received = []
+
+        def on_result(task_id, value):
+            received.append(task_id)
+
+        for i in range(max_retries) if max_retries else count(0):
+            received[:] = []
+            try:
+                return r.get(callback=on_result, propagate=propagate, **kwargs)
+            except (socket.timeout, TimeoutError) as exc:
+                waiting_for = self.missing_results(r)
+                self.speaker.beep()
+                marker(
+                    'Still waiting for {0}/{1}: [{2}]: {3!r}'.format(
+                        len(r) - len(received), len(r),
+                        truncate(', '.join(waiting_for)), exc), '!',
+                )
+                self.fbi.diag(waiting_for)
+            except self.connerrors as exc:
+                self.speaker.beep()
+                marker('join: connection lost: {0!r}'.format(exc), '!')
+        raise StopSuite('Test failed: Missing task results')
+
+    def dump_progress(self):
+        return pstatus(self.progress) if self.progress else 'No test running'
+
+
+_creation_counter = count(0)
+def testcase(*groups):
+    if not groups:
+        raise ValueError('@testcase requires at least one group name')
+
+    def _mark_as_case(fun):
+        fun.__testgroup__ = groups
+        fun.__testsort__ = next(_creation_counter)
+        return fun
+
+    return _mark_as_case
+
+
+def _is_descriptor(obj, attr):
+    try:
+        cattr = getattr(obj.__class__, attr)
+    except AttributeError:
+        pass
+    else:
+        return not inspect.ismethod(cattr) and hasattr(cattr, '__get__')
+    return False
+
+
+class Suite(BaseSuite):
+
+    @testcase('all', 'green')
+    def manyshort(self):
+        self.join(group(add.s(i, i) for i in range(1000))(),
+                  timeout=10, propagate=True)
+
+    @testcase('all')
     def termbysig(self):
         self._evil_groupmember(kill)
 
+    @testcase('green')
     def group_with_exit(self):
         self._evil_groupmember(exiting)
 
-    def termbysegfault(self):
-        self._evil_groupmember(segfault)
-
+    @testcase('all')
     def timelimits(self):
         self._evil_groupmember(sleeping, 2, time_limit=1)
 
+    @testcase('all')
     def timelimits_soft(self):
         self._evil_groupmember(sleeping_ignore_limits, 2,
                                soft_time_limit=1, time_limit=1.1)
 
+    @testcase('all')
     def alwayskilled(self):
         g = group(kill.s() for _ in range(10))
         self.join(g(), timeout=10)
 
+    @testcase('all', 'green')
     def alwaysexits(self):
         g = group(exiting.s() for _ in range(10))
         self.join(g(), timeout=10)
@@ -243,6 +302,7 @@ class Suite(object):
         self.join(g1(), timeout=10)
         self.join(g2(), timeout=10)
 
+    @testcase('all', 'green')
     def bigtasksbigvalue(self):
         g = group(any_returning.s(BIG, sleep=0.3) for i in range(8))
         r = g()
@@ -255,15 +315,19 @@ class Suite(object):
             except NotImplementedError:
                 pass
 
+    @testcase('all', 'green')
     def bigtasks(self, wait=None):
         self._revoketerm(wait, False, False, BIG)
 
+    @testcase('all', 'green')
     def smalltasks(self, wait=None):
         self._revoketerm(wait, False, False, SMALL)
 
+    @testcase('all')
     def revoketermfast(self, wait=None):
         self._revoketerm(wait, True, False, SMALL)
 
+    @testcase('all')
     def revoketermslow(self, wait=5):
         self._revoketerm(wait, True, True, BIG)
 
@@ -276,35 +340,3 @@ class Suite(object):
                 sleep(random.choice(range(4)))
             r.revoke(terminate=True)
         self.join(r, timeout=10)
-
-    def missing_results(self, r):
-        return [res.id for res in r if res.id not in res.backend._cache]
-
-    def join(self, r, propagate=False, max_retries=10, **kwargs):
-        if self.no_join:
-            return
-        received = []
-
-        def on_result(task_id, value):
-            received.append(task_id)
-
-        for i in range(max_retries) if max_retries else count(0):
-            received[:] = []
-            try:
-                return r.get(callback=on_result, propagate=propagate, **kwargs)
-            except (socket.timeout, TimeoutError) as exc:
-                waiting_for = self.missing_results(r)
-                self.speaker.beep()
-                marker(
-                    'Still waiting for {0}/{1}: [{2}]: {3!r}'.format(
-                        len(r) - len(received), len(r),
-                        truncate(', '.join(waiting_for)), exc), '!',
-                )
-                self.fbi.diag(waiting_for)
-            except self.connerrors as exc:
-                self.speaker.beep()
-                marker('join: connection lost: {0!r}'.format(exc), '!')
-        raise StopSuite('Test failed: Missing task results')
-
-    def dump_progress(self):
-        return pstatus(self.progress) if self.progress else 'No test running'