suite.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import inspect
  3. import platform
  4. import random
  5. import socket
  6. import sys
  7. from collections import OrderedDict, defaultdict, namedtuple
  8. from itertools import count
  9. from time import sleep
  10. from celery import VERSION_BANNER, chain, group, uuid
  11. from celery.exceptions import TimeoutError
  12. from celery.five import items, monotonic, range, values
  13. from celery.utils.debug import blockdetection
  14. from celery.utils.text import pluralize, truncate
  15. from celery.utils.timeutils import humanize_seconds
  16. from .app import (
  17. marker, _marker, add, any_, collect_ids, exiting, ids, kill, sleeping,
  18. sleeping_ignore_limits, any_returning, print_unicode,
  19. )
  20. from .data import BIG, SMALL
  21. from .fbi import FBI
  22. BANNER = """\
  23. Celery stress-suite v{version}
  24. {platform}
  25. [config]
  26. .> broker: {conninfo}
  27. [toc: {total} {TESTS} total]
  28. {toc}
  29. """
  30. F_PROGRESS = """\
  31. {0.index}: {0.test.__name__}({0.iteration}/{0.total_iterations}) \
  32. rep#{0.repeats} runtime: {runtime}/{elapsed} \
  33. """
  34. Progress = namedtuple('Progress', (
  35. 'test', 'iteration', 'total_iterations',
  36. 'index', 'repeats', 'runtime', 'elapsed', 'completed',
  37. ))
  38. Inf = float('Inf')
  39. def assert_equal(a, b):
  40. assert a == b, '{0!r} != {1!r}'.format(a, b)
  41. class StopSuite(Exception):
  42. pass
  43. def pstatus(p):
  44. return F_PROGRESS.format(
  45. p,
  46. runtime=humanize_seconds(monotonic() - p.runtime, now='0 seconds'),
  47. elapsed=humanize_seconds(monotonic() - p.elapsed, now='0 seconds'),
  48. )
  49. class Speaker(object):
  50. def __init__(self, gap=5.0):
  51. self.gap = gap
  52. self.last_noise = monotonic() - self.gap * 2
  53. def beep(self):
  54. now = monotonic()
  55. if now - self.last_noise >= self.gap:
  56. self.emit()
  57. self.last_noise = now
  58. def emit(self):
  59. print('\a', file=sys.stderr, end='')
  60. def testgroup(*funs):
  61. return OrderedDict((fun.__name__, fun) for fun in funs)
  62. class BaseSuite(object):
  63. def __init__(self, app, block_timeout=30 * 60):
  64. self.app = app
  65. self.connerrors = self.app.connection().recoverable_connection_errors
  66. self.block_timeout = block_timeout
  67. self.progress = None
  68. self.speaker = Speaker()
  69. self.fbi = FBI(app)
  70. self.init_groups()
  71. def init_groups(self):
  72. acc = defaultdict(list)
  73. for attr in dir(self):
  74. if not _is_descriptor(self, attr):
  75. meth = getattr(self, attr)
  76. try:
  77. groups = meth.__func__.__testgroup__
  78. except AttributeError:
  79. pass
  80. else:
  81. for g in groups:
  82. acc[g].append(meth)
  83. # sort the tests by the order in which they are defined in the class
  84. for g in values(acc):
  85. g[:] = sorted(g, key=lambda m: m.__func__.__testsort__)
  86. self.groups = dict(
  87. (name, testgroup(*tests)) for name, tests in items(acc)
  88. )
  89. def run(self, names=None, iterations=50, offset=0,
  90. numtests=None, list_all=False, repeat=0, group='all',
  91. diag=False, no_join=False, **kw):
  92. self.no_join = no_join
  93. self.fbi.enable(diag)
  94. tests = self.filtertests(group, names)[offset:numtests or None]
  95. if list_all:
  96. return print(self.testlist(tests))
  97. print(self.banner(tests))
  98. print('+ Enabling events')
  99. self.app.control.enable_events()
  100. it = count() if repeat == Inf else range(int(repeat) or 1)
  101. for i in it:
  102. marker(
  103. 'Stresstest suite start (repetition {0})'.format(i + 1),
  104. '+',
  105. )
  106. for j, test in enumerate(tests):
  107. self.runtest(test, iterations, j + 1, i + 1)
  108. marker(
  109. 'Stresstest suite end (repetition {0})'.format(i + 1),
  110. '+',
  111. )
  112. def filtertests(self, group, names):
  113. tests = self.groups[group]
  114. try:
  115. return ([tests[n] for n in names] if names
  116. else list(values(tests)))
  117. except KeyError as exc:
  118. raise KeyError('Unknown test name: {0}'.format(exc))
  119. def testlist(self, tests):
  120. return ',\n'.join(
  121. '.> {0}) {1}'.format(i + 1, t.__name__)
  122. for i, t in enumerate(tests)
  123. )
  124. def banner(self, tests):
  125. app = self.app
  126. return BANNER.format(
  127. app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
  128. version=VERSION_BANNER,
  129. conninfo=app.connection().as_uri(),
  130. platform=platform.platform(),
  131. toc=self.testlist(tests),
  132. TESTS=pluralize(len(tests), 'test'),
  133. total=len(tests),
  134. )
  135. def runtest(self, fun, n=50, index=0, repeats=1):
  136. n = getattr(fun, '__iterations__', None) or n
  137. print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
  138. with blockdetection(self.block_timeout):
  139. with self.fbi.investigation():
  140. runtime = elapsed = monotonic()
  141. i = 0
  142. failed = False
  143. self.progress = Progress(
  144. fun, i, n, index, repeats, elapsed, runtime, 0,
  145. )
  146. _marker.delay(pstatus(self.progress))
  147. try:
  148. for i in range(n):
  149. runtime = monotonic()
  150. self.progress = Progress(
  151. fun, i + 1, n, index, repeats, runtime, elapsed, 0,
  152. )
  153. try:
  154. fun()
  155. except StopSuite:
  156. raise
  157. except Exception as exc:
  158. print('-> {0!r}'.format(exc))
  159. import traceback
  160. print(traceback.format_exc())
  161. print(pstatus(self.progress))
  162. else:
  163. print(pstatus(self.progress))
  164. except Exception:
  165. failed = True
  166. self.speaker.beep()
  167. raise
  168. finally:
  169. print('{0} {1} iterations in {2}'.format(
  170. 'failed after' if failed else 'completed',
  171. i + 1, humanize_seconds(monotonic() - elapsed),
  172. ))
  173. if not failed:
  174. self.progress = Progress(
  175. fun, i + 1, n, index, repeats, runtime, elapsed, 1,
  176. )
  177. def missing_results(self, r):
  178. return [res.id for res in r if res.id not in res.backend._cache]
  179. def join(self, r, propagate=False, max_retries=10, **kwargs):
  180. if self.no_join:
  181. return
  182. received = []
  183. def on_result(task_id, value):
  184. received.append(task_id)
  185. for i in range(max_retries) if max_retries else count(0):
  186. received[:] = []
  187. try:
  188. return r.get(callback=on_result, propagate=propagate, **kwargs)
  189. except (socket.timeout, TimeoutError) as exc:
  190. waiting_for = self.missing_results(r)
  191. self.speaker.beep()
  192. marker(
  193. 'Still waiting for {0}/{1}: [{2}]: {3!r}'.format(
  194. len(r) - len(received), len(r),
  195. truncate(', '.join(waiting_for)), exc), '!',
  196. )
  197. self.fbi.diag(waiting_for)
  198. except self.connerrors as exc:
  199. self.speaker.beep()
  200. marker('join: connection lost: {0!r}'.format(exc), '!')
  201. raise StopSuite('Test failed: Missing task results')
  202. def dump_progress(self):
  203. return pstatus(self.progress) if self.progress else 'No test running'
  204. _creation_counter = count(0)
  205. def testcase(*groups, **kwargs):
  206. if not groups:
  207. raise ValueError('@testcase requires at least one group name')
  208. def _mark_as_case(fun):
  209. fun.__testgroup__ = groups
  210. fun.__testsort__ = next(_creation_counter)
  211. fun.__iterations__ = kwargs.get('iterations')
  212. return fun
  213. return _mark_as_case
  214. def _is_descriptor(obj, attr):
  215. try:
  216. cattr = getattr(obj.__class__, attr)
  217. except AttributeError:
  218. pass
  219. else:
  220. return not inspect.ismethod(cattr) and hasattr(cattr, '__get__')
  221. return False
  222. class Suite(BaseSuite):
  223. @testcase('all', 'green', iterations=1)
  224. def chain(self):
  225. c = add.s(4, 4) | add.s(8) | add.s(16)
  226. assert_equal(self.join(c()), 32)
  227. @testcase('all', 'green', iterations=1)
  228. def chaincomplex(self):
  229. c = (
  230. add.s(2, 2) | (
  231. add.s(4) | add.s(8) | add.s(16)
  232. ) |
  233. group(add.s(i) for i in range(4))
  234. )
  235. res = c()
  236. assert_equal(res.get(), [32, 33, 34, 35])
  237. @testcase('all', 'green', iterations=1)
  238. def parentids_chain(self, num=248):
  239. c = chain(ids.si(i) for i in range(num))
  240. c.freeze()
  241. res = c()
  242. res.get(timeout=5)
  243. self.assert_ids(res, num - 1)
  244. @testcase('all', 'green', iterations=1)
  245. def parentids_group(self):
  246. g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))
  247. res = g()
  248. expected_root_id = res.parent.parent.id
  249. expected_parent_id = res.parent.id
  250. values = res.get(timeout=5)
  251. for i, r in enumerate(values):
  252. root_id, parent_id, value = r
  253. assert_equal(root_id, expected_root_id)
  254. assert_equal(parent_id, expected_parent_id)
  255. assert_equal(value, i + 2)
  256. def assert_ids(self, res, size):
  257. i, root = size, res
  258. while root.parent:
  259. root = root.parent
  260. node = res
  261. while node:
  262. root_id, parent_id, value = node.get(timeout=5)
  263. assert_equal(value, i)
  264. assert_equal(root_id, root.id)
  265. if node.parent:
  266. assert_equal(parent_id, node.parent.id)
  267. node = node.parent
  268. i -= 1
  269. @testcase('redis', iterations=1)
  270. def parentids_chord(self):
  271. self.assert_parentids_chord()
  272. self.assert_parentids_chord(uuid(), uuid())
  273. def assert_parentids_chord(self, base_root=None, base_parent=None):
  274. g = (
  275. ids.si(1) |
  276. ids.si(2) |
  277. group(ids.si(i) for i in range(3, 50)) |
  278. collect_ids.s(i=50) |
  279. ids.si(51)
  280. )
  281. g.freeze(root_id=base_root, parent_id=base_parent)
  282. res = g.apply_async(root_id=base_root, parent_id=base_parent)
  283. expected_root_id = base_root or res.parent.parent.parent.id
  284. root_id, parent_id, value = res.get(timeout=5)
  285. assert_equal(value, 51)
  286. assert_equal(root_id, expected_root_id)
  287. assert_equal(parent_id, res.parent.id)
  288. prev, (root_id, parent_id, value) = res.parent.get(timeout=5)
  289. assert_equal(value, 50)
  290. assert_equal(root_id, expected_root_id)
  291. assert_equal(parent_id, res.parent.parent.id)
  292. for i, p in enumerate(prev):
  293. root_id, parent_id, value = p
  294. assert_equal(root_id, expected_root_id)
  295. assert_equal(parent_id, res.parent.parent.id)
  296. root_id, parent_id, value = res.parent.parent.get(timeout=5)
  297. assert_equal(value, 2)
  298. assert_equal(parent_id, res.parent.parent.parent.id)
  299. assert_equal(root_id, expected_root_id)
  300. root_id, parent_id, value = res.parent.parent.parent.get(timeout=5)
  301. assert_equal(value, 1)
  302. assert_equal(root_id, expected_root_id)
  303. assert_equal(parent_id, base_parent)
  304. @testcase('all', 'green')
  305. def manyshort(self):
  306. self.join(group(add.s(i, i) for i in range(1000))(),
  307. timeout=10, propagate=True)
  308. @testcase('all', 'green', iterations=1)
  309. def unicodetask(self):
  310. self.join(group(print_unicode.s() for _ in range(5))(),
  311. timeout=1, propagate=True)
  312. @testcase('all')
  313. def always_timeout(self):
  314. self.join(
  315. group(sleeping.s(1).set(time_limit=0.1)
  316. for _ in range(100))(),
  317. timeout=10, propagate=True,
  318. )
  319. @testcase('all')
  320. def termbysig(self):
  321. self._evil_groupmember(kill)
  322. @testcase('green')
  323. def group_with_exit(self):
  324. self._evil_groupmember(exiting)
  325. @testcase('all')
  326. def timelimits(self):
  327. self._evil_groupmember(sleeping, 2, time_limit=1)
  328. @testcase('all')
  329. def timelimits_soft(self):
  330. self._evil_groupmember(sleeping_ignore_limits, 2,
  331. soft_time_limit=1, time_limit=1.1)
  332. @testcase('all')
  333. def alwayskilled(self):
  334. g = group(kill.s() for _ in range(10))
  335. self.join(g(), timeout=10)
  336. @testcase('all', 'green')
  337. def alwaysexits(self):
  338. g = group(exiting.s() for _ in range(10))
  339. self.join(g(), timeout=10)
  340. def _evil_groupmember(self, evil_t, *eargs, **opts):
  341. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  342. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  343. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  344. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  345. self.join(g1(), timeout=10)
  346. self.join(g2(), timeout=10)
  347. @testcase('all', 'green')
  348. def bigtasksbigvalue(self):
  349. g = group(any_returning.s(BIG, sleep=0.3) for i in range(8))
  350. r = g()
  351. try:
  352. self.join(r, timeout=10)
  353. finally:
  354. # very big values so remove results from backend
  355. try:
  356. r.forget()
  357. except NotImplementedError:
  358. pass
  359. @testcase('all', 'green')
  360. def bigtasks(self, wait=None):
  361. self._revoketerm(wait, False, False, BIG)
  362. @testcase('all', 'green')
  363. def smalltasks(self, wait=None):
  364. self._revoketerm(wait, False, False, SMALL)
  365. @testcase('all')
  366. def revoketermfast(self, wait=None):
  367. self._revoketerm(wait, True, False, SMALL)
  368. @testcase('all')
  369. def revoketermslow(self, wait=5):
  370. self._revoketerm(wait, True, True, BIG)
  371. def _revoketerm(self, wait=None, terminate=True,
  372. joindelay=True, data=BIG):
  373. g = group(any_.s(data, sleep=wait) for i in range(8))
  374. r = g()
  375. if terminate:
  376. if joindelay:
  377. sleep(random.choice(range(4)))
  378. r.revoke(terminate=True)
  379. self.join(r, timeout=10)