suite.py 8.2 KB


  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import platform
  3. import random
  4. import socket
  5. from collections import namedtuple
  6. from itertools import count
  7. from time import sleep
  8. from kombu.utils.compat import OrderedDict
  9. from celery import group, VERSION_BANNER
  10. from celery.exceptions import TimeoutError
  11. from celery.five import range, values, monotonic
  12. from celery.utils.debug import blockdetection
  13. from celery.utils.text import pluralize
  14. from celery.utils.timeutils import humanize_seconds
  15. from .app import (
  16. marker, _marker, add, any_, exiting, kill, sleeping,
  17. sleeping_ignore_limits, segfault,
  18. )
  19. from .data import BIG, SMALL
  20. BANNER = """\
  21. Celery stress-suite v{version}
  22. {platform}
  23. [config]
  24. .> broker: {conninfo}
  25. [toc: {total} {TESTS} total]
  26. {toc}
  27. """
  28. F_PROGRESS = """\
  29. {0.index}: {0.test.__name__}({0.iteration}/{0.total_iterations}) \
  30. rep#{0.repeats} runtime: {runtime}/{elapsed} \
  31. """
  32. Progress = namedtuple('Progress', (
  33. 'test', 'iteration', 'total_iterations',
  34. 'index', 'repeats', 'runtime', 'elapsed', 'completed',
  35. ))
  36. class StopSuite(Exception):
  37. pass
  38. def pstatus(p):
  39. return F_PROGRESS.format(
  40. p,
  41. runtime=humanize_seconds(monotonic() - p.runtime, now='0 seconds'),
  42. elapsed=humanize_seconds(monotonic() - p.elapsed, now='0 seconds'),
  43. )
  44. def testgroup(*funs):
  45. return OrderedDict((fun.__name__, fun) for fun in funs)
  46. class Suite(object):
  47. def __init__(self, app, block_timeout=30 * 60):
  48. self.app = app
  49. self.connerrors = self.app.connection().recoverable_connection_errors
  50. self.block_timeout = block_timeout
  51. self.progress = None
  52. self.groups = {
  53. 'all': testgroup(
  54. self.manyshort,
  55. self.termbysig,
  56. self.bigtasks,
  57. self.smalltasks,
  58. self.timelimits,
  59. self.timelimits_soft,
  60. self.revoketermfast,
  61. self.revoketermslow,
  62. self.alwayskilled,
  63. self.alwaysexits,
  64. ),
  65. 'green': testgroup(
  66. self.manyshort,
  67. self.bigtasks,
  68. self.smalltasks,
  69. self.alwaysexits,
  70. self.group_with_exit,
  71. ),
  72. }
  73. def run(self, names=None, iterations=50, offset=0,
  74. numtests=None, list_all=False, repeat=0, group='all', **kw):
  75. tests = self.filtertests(group, names)[offset:numtests or None]
  76. if list_all:
  77. return print(self.testlist(tests))
  78. print(self.banner(tests))
  79. it = count() if repeat == float('Inf') else range(int(repeat) + 1)
  80. for i in it:
  81. marker(
  82. 'Stresstest suite start (repetition {0})'.format(i + 1),
  83. '+',
  84. )
  85. for j, test in enumerate(tests):
  86. self.runtest(test, iterations, j + 1, i + 1)
  87. marker(
  88. 'Stresstest suite end (repetition {0})'.format(i + 1),
  89. '+',
  90. )
  91. def filtertests(self, group, names):
  92. tests = self.groups[group]
  93. try:
  94. return ([tests[n] for n in names] if names
  95. else list(values(tests)))
  96. except KeyError as exc:
  97. raise KeyError('Unknown test name: {0}'.format(exc))
  98. def testlist(self, tests):
  99. return ',\n'.join(
  100. '.> {0}) {1}'.format(i + 1, t.__name__)
  101. for i, t in enumerate(tests)
  102. )
  103. def banner(self, tests):
  104. app = self.app
  105. return BANNER.format(
  106. app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
  107. version=VERSION_BANNER,
  108. conninfo=app.connection().as_uri(),
  109. platform=platform.platform(),
  110. toc=self.testlist(tests),
  111. TESTS=pluralize(len(tests), 'test'),
  112. total=len(tests),
  113. )
  114. def manyshort(self):
  115. self.join(group(add.s(i, i) for i in range(1000))(), timeout=10, propagate=True)
  116. def runtest(self, fun, n=50, index=0, repeats=1):
  117. print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
  118. with blockdetection(self.block_timeout):
  119. runtime = elapsed = monotonic()
  120. i = 0
  121. failed = False
  122. self.progress = Progress(
  123. fun, i, n, index, repeats, elapsed, runtime, 0,
  124. )
  125. _marker.delay(pstatus(self.progress))
  126. try:
  127. for i in range(n):
  128. runtime = monotonic()
  129. self.progress = Progress(
  130. fun, i + 1, n, index, repeats, runtime, elapsed, 0,
  131. )
  132. try:
  133. fun()
  134. except StopSuite:
  135. raise
  136. except Exception as exc:
  137. print('-> {0!r}'.format(exc))
  138. print(pstatus(self.progress))
  139. else:
  140. print(pstatus(self.progress))
  141. except Exception:
  142. failed = True
  143. raise
  144. finally:
  145. print('{0} {1} iterations in {2}s'.format(
  146. 'failed after' if failed else 'completed',
  147. i + 1, humanize_seconds(monotonic() - elapsed),
  148. ))
  149. if not failed:
  150. self.progress = Progress(
  151. fun, i + 1, n, index, repeats, runtime, elapsed, 1,
  152. )
  153. def termbysig(self):
  154. self._evil_groupmember(kill)
  155. def group_with_exit(self):
  156. self._evil_groupmember(exiting)
  157. def termbysegfault(self):
  158. self._evil_groupmember(segfault)
  159. def timelimits(self):
  160. self._evil_groupmember(sleeping, 2, timeout=1)
  161. def timelimits_soft(self):
  162. self._evil_groupmember(sleeping_ignore_limits, 2,
  163. soft_timeout=1, timeout=1.1)
  164. def alwayskilled(self):
  165. g = group(kill.s() for _ in range(10))
  166. self.join(g(), timeout=10)
  167. def alwaysexits(self):
  168. g = group(exiting.s() for _ in range(10))
  169. self.join(g(), timeout=10)
  170. def _evil_groupmember(self, evil_t, *eargs, **opts):
  171. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  172. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  173. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  174. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  175. self.join(g1(), timeout=10)
  176. self.join(g2(), timeout=10)
  177. def bigtasks(self, wait=None):
  178. self._revoketerm(wait, False, False, BIG)
  179. def smalltasks(self, wait=None):
  180. self._revoketerm(wait, False, False, SMALL)
  181. def revoketermfast(self, wait=None):
  182. self._revoketerm(wait, True, False, SMALL)
  183. def revoketermslow(self, wait=5):
  184. self._revoketerm(wait, True, True, BIG)
  185. def _revoketerm(self, wait=None, terminate=True,
  186. joindelay=True, data=BIG):
  187. g = group(any_.s(data, sleep=wait) for i in range(8))
  188. r = g()
  189. if terminate:
  190. if joindelay:
  191. sleep(random.choice(range(4)))
  192. r.revoke(terminate=True)
  193. self.join(r, timeout=5)
  194. def missing_results(self, r):
  195. return [res.id for res in r if res.id not in res.backend._cache]
  196. def join(self, r, propagate=False, max_retries=1, **kwargs):
  197. received = []
  198. def on_result(task_id, value):
  199. received.append(task_id)
  200. for i in range(max_retries) if max_retries else count(0):
  201. received[:] = []
  202. try:
  203. return r.get(callback=on_result, propagate=propagate, **kwargs)
  204. except (socket.timeout, TimeoutError) as exc:
  205. waiting_for = self.missing_results(r)
  206. marker(
  207. 'Still waiting for (0) [{1}]: {2!r}'.format(
  208. len(waiting_for), ','.join(waiting_for), exc), '!',
  209. )
  210. except self.connerrors as exc:
  211. marker('join: connection lost: {0!r}'.format(exc), '!')
  212. raise StopSuite('Test failed: Missing task results')
  213. def dump_progress(self):
  214. return pstatus(self.progress) if self.progress else 'No test running'