suite.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from __future__ import absolute_import, print_function
  2. import platform
  3. import random
  4. from itertools import count
  5. from time import time, sleep
  6. from kombu.utils.compat import OrderedDict
  7. from celery import group, VERSION_BANNER
  8. from celery.exceptions import TimeoutError
  9. from celery.five import range, values
  10. from celery.utils.debug import blockdetection
  11. from celery.utils.text import pluralize
  12. from celery.utils.timeutils import humanize_seconds
  13. from .app import (
  14. marker, add, any_, kill, sleeping,
  15. sleeping_ignore_limits, segfault,
  16. )
  17. from .data import BIG, SMALL
  18. BANNER = """\
  19. Celery stress-suite v{version}
  20. {platform}
  21. [config]
  22. .> broker: {conninfo}
  23. [toc: {total} {TESTS} total]
  24. {toc}
  25. """
  26. class Suite(object):
  27. def __init__(self, app, block_timeout=30 * 60):
  28. self.app = app
  29. self.connerrors = self.app.connection().recoverable_connection_errors
  30. self.block_timeout = block_timeout
  31. self.tests = OrderedDict(
  32. (fun.__name__, fun) for fun in [
  33. self.manyshort,
  34. self.termbysig,
  35. self.bigtasks,
  36. self.smalltasks,
  37. self.timelimits,
  38. self.timelimits_soft,
  39. self.revoketermfast,
  40. self.revoketermslow,
  41. self.alwayskilled,
  42. ]
  43. )
  44. def run(self, names=None, iterations=50, offset=0,
  45. numtests=None, list_all=False, repeat=0, **kw):
  46. tests = self.filtertests(names)[offset:numtests or None]
  47. if list_all:
  48. return print(self.testlist(tests))
  49. print(self.banner(tests))
  50. it = count() if repeat == float('Inf') else range(int(repeat) + 1)
  51. for i in it:
  52. marker(
  53. 'Stresstest suite start (repetition {0})'.format(i + 1),
  54. '+',
  55. )
  56. for j, test in enumerate(tests):
  57. self.runtest(test, iterations, j + 1)
  58. marker(
  59. 'Stresstest suite end (repetition {0})'.format(i + 1),
  60. '+',
  61. )
  62. def filtertests(self, names):
  63. try:
  64. return ([self.tests[n] for n in names] if names
  65. else list(values(self.tests)))
  66. except KeyError as exc:
  67. raise KeyError('Unknown test name: {0}'.format(exc))
  68. def testlist(self, tests):
  69. return ',\n'.join(
  70. '.> {0}) {1}'.format(i + 1, t.__name__)
  71. for i, t in enumerate(tests)
  72. )
  73. def banner(self, tests):
  74. app = self.app
  75. return BANNER.format(
  76. app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
  77. version=VERSION_BANNER,
  78. conninfo=app.connection().as_uri(),
  79. platform=platform.platform(),
  80. toc=self.testlist(tests),
  81. TESTS=pluralize(len(tests), 'test'),
  82. total=len(tests),
  83. )
  84. def manyshort(self):
  85. self.join(group(add.s(i, i) for i in xrange(1000))(), propagate=True)
  86. def runtest(self, fun, n=50, index=0):
  87. with blockdetection(self.block_timeout):
  88. t = time()
  89. i = 0
  90. failed = False
  91. marker('{0}: {1}({2})'.format(index, fun.__name__, n))
  92. try:
  93. for i in range(n):
  94. print('{0} ({1})'.format(i, fun.__name__), end=' ')
  95. try:
  96. fun()
  97. print('-> done')
  98. except Exception as exc:
  99. print('-> {}'.format(exc))
  100. except Exception:
  101. failed = True
  102. raise
  103. finally:
  104. print('{0} {1} iterations in {2}s'.format(
  105. 'failed after' if failed else 'completed',
  106. i + 1, humanize_seconds(time() - t),
  107. ))
  108. def termbysig(self):
  109. self._evil_groupmember(kill)
  110. def termbysegfault(self):
  111. self._evil_groupmember(segfault)
  112. def timelimits(self):
  113. self._evil_groupmember(sleeping, 2, timeout=1)
  114. def timelimits_soft(self):
  115. self._evil_groupmember(sleeping_ignore_limits, 2,
  116. soft_timeout=1, timeout=1.1)
  117. def alwayskilled(self):
  118. g = group(kill.s() for _ in range(10))
  119. self.join(g(), timeout=10)
  120. def _evil_groupmember(self, evil_t, *eargs, **opts):
  121. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  122. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  123. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  124. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  125. self.join(g1(), timeout=10)
  126. self.join(g2(), timeout=10)
  127. def bigtasks(self, wait=None):
  128. self._revoketerm(wait, False, False, BIG)
  129. def smalltasks(self, wait=None):
  130. self._revoketerm(wait, False, False, SMALL)
  131. def revoketermfast(self, wait=None):
  132. self._revoketerm(wait, True, False, SMALL)
  133. def revoketermslow(self, wait=5):
  134. self._revoketerm(wait, True, True, BIG)
  135. def _revoketerm(self, wait=None, terminate=True,
  136. joindelay=True, data=BIG):
  137. g = group(any_.s(data, sleep=wait) for i in range(8))
  138. r = g()
  139. if terminate:
  140. if joindelay:
  141. sleep(random.choice(range(4)))
  142. r.revoke(terminate=True)
  143. self.join(r, timeout=5)
  144. def missing_results(self, r):
  145. return [result.id for result in r if result.id not in result.backend._cache]
  146. def join(self, r, propagate=False, **kwargs):
  147. while 1:
  148. try:
  149. return r.get(propagate=propagate, **kwargs)
  150. except TimeoutError as exc:
  151. marker(
  152. 'Still waiting for {0!r}: {1!r}'.format(
  153. self.missing_results(r), exc), '!')
  154. except self.connerrors as exc:
  155. marker('join: connection lost: {0!r}'.format(exc), '!')