suite.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import platform
  3. import random
  4. from collections import namedtuple
  5. from itertools import count
  6. from time import time, sleep
  7. from kombu.utils.compat import OrderedDict
  8. from celery import group, VERSION_BANNER
  9. from celery.exceptions import TimeoutError
  10. from celery.five import range, values
  11. from celery.utils.debug import blockdetection
  12. from celery.utils.text import pluralize
  13. from celery.utils.timeutils import humanize_seconds
  14. from .app import (
  15. marker, _marker, add, any_, kill, sleeping,
  16. sleeping_ignore_limits, segfault,
  17. )
  18. from .data import BIG, SMALL
  19. BANNER = """\
  20. Celery stress-suite v{version}
  21. {platform}
  22. [config]
  23. .> broker: {conninfo}
  24. [toc: {total} {TESTS} total]
  25. {toc}
  26. """
  27. F_PROGRESS = """\
  28. {0.index}: {0.test.__name__}({0.iteration}/{0.total_iterations}) \
  29. rep#{0.repeats} elapsed:{since} \
  30. """
  31. Progress = namedtuple('Progress', (
  32. 'test', 'iteration', 'total_iterations',
  33. 'index', 'repeats', 'when', 'completed',
  34. ))
  35. def pstatus(p):
  36. return F_PROGRESS.format(
  37. p, since=humanize_seconds(time() - p.when, now='0 seconds'))
  38. class Suite(object):
  39. def __init__(self, app, block_timeout=30 * 60):
  40. self.app = app
  41. self.connerrors = self.app.connection().recoverable_connection_errors
  42. self.block_timeout = block_timeout
  43. self.progress = None
  44. self.tests = OrderedDict(
  45. (fun.__name__, fun) for fun in [
  46. self.manyshort,
  47. self.termbysig,
  48. self.bigtasks,
  49. self.smalltasks,
  50. self.timelimits,
  51. self.timelimits_soft,
  52. self.revoketermfast,
  53. self.revoketermslow,
  54. self.alwayskilled,
  55. ]
  56. )
  57. def run(self, names=None, iterations=50, offset=0,
  58. numtests=None, list_all=False, repeat=0, **kw):
  59. tests = self.filtertests(names)[offset:numtests or None]
  60. if list_all:
  61. return print(self.testlist(tests))
  62. print(self.banner(tests))
  63. it = count() if repeat == float('Inf') else range(int(repeat) + 1)
  64. for i in it:
  65. marker(
  66. 'Stresstest suite start (repetition {0})'.format(i + 1),
  67. '+',
  68. )
  69. for j, test in enumerate(tests):
  70. self.runtest(test, iterations, j + 1, i + 1)
  71. marker(
  72. 'Stresstest suite end (repetition {0})'.format(i + 1),
  73. '+',
  74. )
  75. def filtertests(self, names):
  76. try:
  77. return ([self.tests[n] for n in names] if names
  78. else list(values(self.tests)))
  79. except KeyError as exc:
  80. raise KeyError('Unknown test name: {0}'.format(exc))
  81. def testlist(self, tests):
  82. return ',\n'.join(
  83. '.> {0}) {1}'.format(i + 1, t.__name__)
  84. for i, t in enumerate(tests)
  85. )
  86. def banner(self, tests):
  87. app = self.app
  88. return BANNER.format(
  89. app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
  90. version=VERSION_BANNER,
  91. conninfo=app.connection().as_uri(),
  92. platform=platform.platform(),
  93. toc=self.testlist(tests),
  94. TESTS=pluralize(len(tests), 'test'),
  95. total=len(tests),
  96. )
  97. def manyshort(self):
  98. self.join(group(add.s(i, i) for i in xrange(1000))(), propagate=True)
  99. def runtest(self, fun, n=50, index=0, repeats=1):
  100. with blockdetection(self.block_timeout):
  101. t = time()
  102. i = 0
  103. failed = False
  104. self.progress = Progress(fun, i, n, index, repeats, t, 0)
  105. _marker.delay(pstatus(self.progress))
  106. try:
  107. for i in range(n):
  108. self.progress = Progress(fun, i, n, index, repeats, t, 0)
  109. print(pstatus(self.progress), end=' ')
  110. try:
  111. fun()
  112. print('-> done')
  113. except Exception as exc:
  114. print('-> {0!r}'.format(exc))
  115. except Exception:
  116. failed = True
  117. raise
  118. finally:
  119. print('{0} {1} iterations in {2}s'.format(
  120. 'failed after' if failed else 'completed',
  121. i + 1, humanize_seconds(time() - t),
  122. ))
  123. if not failed:
  124. self.progress = Progress(fun, i, n, index, repeats, t, 1)
  125. def termbysig(self):
  126. self._evil_groupmember(kill)
  127. def termbysegfault(self):
  128. self._evil_groupmember(segfault)
  129. def timelimits(self):
  130. self._evil_groupmember(sleeping, 2, timeout=1)
  131. def timelimits_soft(self):
  132. self._evil_groupmember(sleeping_ignore_limits, 2,
  133. soft_timeout=1, timeout=1.1)
  134. def alwayskilled(self):
  135. g = group(kill.s() for _ in range(10))
  136. self.join(g(), timeout=10)
  137. def _evil_groupmember(self, evil_t, *eargs, **opts):
  138. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  139. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  140. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  141. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  142. self.join(g1(), timeout=10)
  143. self.join(g2(), timeout=10)
  144. def bigtasks(self, wait=None):
  145. self._revoketerm(wait, False, False, BIG)
  146. def smalltasks(self, wait=None):
  147. self._revoketerm(wait, False, False, SMALL)
  148. def revoketermfast(self, wait=None):
  149. self._revoketerm(wait, True, False, SMALL)
  150. def revoketermslow(self, wait=5):
  151. self._revoketerm(wait, True, True, BIG)
  152. def _revoketerm(self, wait=None, terminate=True,
  153. joindelay=True, data=BIG):
  154. g = group(any_.s(data, sleep=wait) for i in range(8))
  155. r = g()
  156. if terminate:
  157. if joindelay:
  158. sleep(random.choice(range(4)))
  159. r.revoke(terminate=True)
  160. self.join(r, timeout=5)
  161. def missing_results(self, r):
  162. return [res.id for res in r if res.id not in res.backend._cache]
  163. def join(self, r, propagate=False, **kwargs):
  164. while 1:
  165. try:
  166. return r.get(propagate=propagate, **kwargs)
  167. except TimeoutError as exc:
  168. marker(
  169. 'Still waiting for {0!r}: {1!r}'.format(
  170. self.missing_results(r), exc), '!')
  171. except self.connerrors as exc:
  172. marker('join: connection lost: {0!r}'.format(exc), '!')
  173. def dump_progress(self):
  174. return pstatus(self.progress) if self.progress else 'No test running'