suite.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import platform
  3. import random
  4. import socket
  5. import sys
  6. from collections import namedtuple
  7. from itertools import count
  8. from time import sleep
  9. from kombu.utils.compat import OrderedDict
  10. from celery import group, VERSION_BANNER
  11. from celery.exceptions import TimeoutError
  12. from celery.five import range, values, monotonic
  13. from celery.utils.debug import blockdetection
  14. from celery.utils.text import pluralize
  15. from celery.utils.timeutils import humanize_seconds
  16. from .app import (
  17. marker, _marker, add, any_, exiting, kill, sleeping,
  18. sleeping_ignore_limits, segfault,
  19. )
  20. from .data import BIG, SMALL
  21. from .fbi import FBI
  22. BANNER = """\
  23. Celery stress-suite v{version}
  24. {platform}
  25. [config]
  26. .> broker: {conninfo}
  27. [toc: {total} {TESTS} total]
  28. {toc}
  29. """
  30. F_PROGRESS = """\
  31. {0.index}: {0.test.__name__}({0.iteration}/{0.total_iterations}) \
  32. rep#{0.repeats} runtime: {runtime}/{elapsed} \
  33. """
  34. Progress = namedtuple('Progress', (
  35. 'test', 'iteration', 'total_iterations',
  36. 'index', 'repeats', 'runtime', 'elapsed', 'completed',
  37. ))
  38. Inf = float('Inf')
  39. class StopSuite(Exception):
  40. pass
  41. def pstatus(p):
  42. return F_PROGRESS.format(
  43. p,
  44. runtime=humanize_seconds(monotonic() - p.runtime, now='0 seconds'),
  45. elapsed=humanize_seconds(monotonic() - p.elapsed, now='0 seconds'),
  46. )
  47. class Speaker(object):
  48. def __init__(self, gap=5.0):
  49. self.gap = gap
  50. self.last_noise = monotonic() - self.gap * 2
  51. def beep(self):
  52. now = monotonic()
  53. if now - self.last_noise >= self.gap:
  54. self.emit()
  55. self.last_noise = now
  56. def emit(self):
  57. print('\a', file=sys.stderr, end='')
  58. def testgroup(*funs):
  59. return OrderedDict((fun.__name__, fun) for fun in funs)
  60. class Suite(object):
  61. def __init__(self, app, block_timeout=30 * 60):
  62. self.app = app
  63. self.connerrors = self.app.connection().recoverable_connection_errors
  64. self.block_timeout = block_timeout
  65. self.progress = None
  66. self.speaker = Speaker()
  67. self.fbi = FBI(app)
  68. self.groups = {
  69. 'all': testgroup(
  70. self.manyshort,
  71. self.termbysig,
  72. self.bigtasks,
  73. self.smalltasks,
  74. self.timelimits,
  75. self.timelimits_soft,
  76. self.revoketermfast,
  77. self.revoketermslow,
  78. self.alwayskilled,
  79. self.alwaysexits,
  80. ),
  81. 'green': testgroup(
  82. self.manyshort,
  83. self.bigtasks,
  84. self.smalltasks,
  85. self.alwaysexits,
  86. self.group_with_exit,
  87. ),
  88. }
  89. def run(self, names=None, iterations=50, offset=0,
  90. numtests=None, list_all=False, repeat=0, group='all',
  91. diag=False, no_join=False, **kw):
  92. self.no_join = no_join
  93. self.fbi.enable(diag)
  94. tests = self.filtertests(group, names)[offset:numtests or None]
  95. if list_all:
  96. return print(self.testlist(tests))
  97. print(self.banner(tests))
  98. print('+ Enabling events')
  99. self.app.control.enable_events()
  100. it = count() if repeat == Inf else range(int(repeat) or 1)
  101. for i in it:
  102. marker(
  103. 'Stresstest suite start (repetition {0})'.format(i + 1),
  104. '+',
  105. )
  106. for j, test in enumerate(tests):
  107. self.runtest(test, iterations, j + 1, i + 1)
  108. marker(
  109. 'Stresstest suite end (repetition {0})'.format(i + 1),
  110. '+',
  111. )
  112. def filtertests(self, group, names):
  113. tests = self.groups[group]
  114. try:
  115. return ([tests[n] for n in names] if names
  116. else list(values(tests)))
  117. except KeyError as exc:
  118. raise KeyError('Unknown test name: {0}'.format(exc))
  119. def testlist(self, tests):
  120. return ',\n'.join(
  121. '.> {0}) {1}'.format(i + 1, t.__name__)
  122. for i, t in enumerate(tests)
  123. )
  124. def banner(self, tests):
  125. app = self.app
  126. return BANNER.format(
  127. app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
  128. version=VERSION_BANNER,
  129. conninfo=app.connection().as_uri(),
  130. platform=platform.platform(),
  131. toc=self.testlist(tests),
  132. TESTS=pluralize(len(tests), 'test'),
  133. total=len(tests),
  134. )
  135. def manyshort(self):
  136. self.join(group(add.s(i, i) for i in range(1000))(),
  137. timeout=10, propagate=True)
  138. def runtest(self, fun, n=50, index=0, repeats=1):
  139. print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
  140. with blockdetection(self.block_timeout):
  141. with self.fbi.investigation():
  142. runtime = elapsed = monotonic()
  143. i = 0
  144. failed = False
  145. self.progress = Progress(
  146. fun, i, n, index, repeats, elapsed, runtime, 0,
  147. )
  148. _marker.delay(pstatus(self.progress))
  149. try:
  150. for i in range(n):
  151. runtime = monotonic()
  152. self.progress = Progress(
  153. fun, i + 1, n, index, repeats, runtime, elapsed, 0,
  154. )
  155. try:
  156. fun()
  157. except StopSuite:
  158. raise
  159. except Exception as exc:
  160. print('-> {0!r}'.format(exc))
  161. print(pstatus(self.progress))
  162. else:
  163. print(pstatus(self.progress))
  164. except Exception:
  165. failed = True
  166. self.speaker.beep()
  167. raise
  168. finally:
  169. print('{0} {1} iterations in {2}s'.format(
  170. 'failed after' if failed else 'completed',
  171. i + 1, humanize_seconds(monotonic() - elapsed),
  172. ))
  173. if not failed:
  174. self.progress = Progress(
  175. fun, i + 1, n, index, repeats, runtime, elapsed, 1,
  176. )
  177. def termbysig(self):
  178. self._evil_groupmember(kill)
  179. def group_with_exit(self):
  180. self._evil_groupmember(exiting)
  181. def termbysegfault(self):
  182. self._evil_groupmember(segfault)
  183. def timelimits(self):
  184. self._evil_groupmember(sleeping, 2, timeout=1)
  185. def timelimits_soft(self):
  186. self._evil_groupmember(sleeping_ignore_limits, 2,
  187. soft_timeout=1, timeout=1.1)
  188. def alwayskilled(self):
  189. g = group(kill.s() for _ in range(10))
  190. self.join(g(), timeout=10)
  191. def alwaysexits(self):
  192. g = group(exiting.s() for _ in range(10))
  193. self.join(g(), timeout=10)
  194. def _evil_groupmember(self, evil_t, *eargs, **opts):
  195. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  196. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  197. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  198. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  199. self.join(g1(), timeout=10)
  200. self.join(g2(), timeout=10)
  201. def bigtasks(self, wait=None):
  202. self._revoketerm(wait, False, False, BIG)
  203. def smalltasks(self, wait=None):
  204. self._revoketerm(wait, False, False, SMALL)
  205. def revoketermfast(self, wait=None):
  206. self._revoketerm(wait, True, False, SMALL)
  207. def revoketermslow(self, wait=5):
  208. self._revoketerm(wait, True, True, BIG)
  209. def _revoketerm(self, wait=None, terminate=True,
  210. joindelay=True, data=BIG):
  211. g = group(any_.s(data, sleep=wait) for i in range(8))
  212. r = g()
  213. if terminate:
  214. if joindelay:
  215. sleep(random.choice(range(4)))
  216. r.revoke(terminate=True)
  217. self.join(r, timeout=5)
  218. def missing_results(self, r):
  219. return [res.id for res in r if res.id not in res.backend._cache]
  220. def join(self, r, propagate=False, max_retries=5, **kwargs):
  221. if self.no_join:
  222. return
  223. received = []
  224. def on_result(task_id, value):
  225. received.append(task_id)
  226. for i in range(max_retries) if max_retries else count(0):
  227. received[:] = []
  228. try:
  229. return r.get(callback=on_result, propagate=propagate, **kwargs)
  230. except (socket.timeout, TimeoutError) as exc:
  231. waiting_for = self.missing_results(r)
  232. self.speaker.beep()
  233. marker(
  234. 'Still waiting for {0}/{1}: [{2}]: {3!r}'.format(
  235. len(r) - len(received), len(r),
  236. ','.join(waiting_for), exc), '!',
  237. )
  238. self.fbi.diag(waiting_for)
  239. except self.connerrors as exc:
  240. self.speaker.beep()
  241. marker('join: connection lost: {0!r}'.format(exc), '!')
  242. raise StopSuite('Test failed: Missing task results')
  243. def dump_progress(self):
  244. return pstatus(self.progress) if self.progress else 'No test running'