stress.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. #!/usr/bin/env python
  2. from __future__ import absolute_import, print_function
  3. import os
  4. import platform
  5. import random
  6. import signal
  7. import sys
  8. from time import time, sleep
  9. from kombu import Exchange, Queue
  10. from kombu.utils.compat import OrderedDict
  11. from celery import Celery, group, VERSION_BANNER
  12. from celery.bin.base import Command, Option
  13. from celery.exceptions import TimeoutError, SoftTimeLimitExceeded
  14. from celery.five import range, values
  15. from celery.utils.debug import blockdetection
  16. from celery.utils.text import indent, pluralize
  17. # Should be run with workers running using these options:
  18. #
  19. # 1) celery -A stress worker -c 1 --maxtasksperchild=1
  20. # 2) celery -A stress worker -c 8 --maxtasksperchild=1
  21. #
  22. # 3) celery -A stress worker -c 1
  23. # 4) celery -A stress worker -c 8
  24. #
  25. # 5) celery -A stress worker --autoscale=8,0
  26. #
  27. # 6) celery -A stress worker --time-limit=1
  28. #
  29. # 7) celery -A stress worker -c1 --maxtasksperchild=1 -- celery.acks_late=1
  30. BIG = 'x' * 2 ** 20 * 8
  31. SMALL = 'e' * 1024
  32. BANNER = """\
  33. Celery stress-suite v{version}
  34. {platform}
  35. [config]
  36. .> broker: {conninfo}
  37. [toc: {total} {TESTS} total]
  38. {toc}
  39. """
  40. CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
  41. CSTRESS_BACKEND = os.environ.get('CSTRESS_BACKEND', 'redis://')
  42. app = Celery(
  43. 'stress', broker='amqp://', backend=CSTRESS_BACKEND,
  44. set_as_current=False,
  45. )
  46. app.conf.update(
  47. CELERYD_PREFETCH_MULTIPLIER=10,
  48. CELERY_DEFAULT_QUEUE=CSTRESS_QUEUE,
  49. CELERY_QUEUES=(
  50. Queue(CSTRESS_QUEUE,
  51. exchange=Exchange(CSTRESS_QUEUE, durable=False),
  52. routing_key=CSTRESS_QUEUE,
  53. durable=False, auto_delete=True),
  54. ),
  55. )
  56. @app.task
  57. def _marker(s, sep='-'):
  58. print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
  59. @app.task
  60. def add(x, y):
  61. return x + y
  62. @app.task
  63. def any_(*args, **kwargs):
  64. wait = kwargs.get('sleep')
  65. if wait:
  66. sleep(wait)
  67. @app.task
  68. def exiting(status=0):
  69. sys.exit(status)
  70. @app.task
  71. def kill(sig=signal.SIGKILL):
  72. os.kill(os.getpid(), sig)
  73. @app.task
  74. def sleeping(i):
  75. sleep(i)
  76. @app.task
  77. def sleeping_ignore_limits(i):
  78. try:
  79. sleep(i)
  80. except SoftTimeLimitExceeded:
  81. sleep(i)
  82. @app.task
  83. def segfault():
  84. import ctypes
  85. ctypes.memset(0, 0, 1)
  86. assert False, 'should not get here'
  87. def marker(s, sep='-'):
  88. print('{0}{1}'.format(sep, s))
  89. _marker.delay(s, sep)
  90. class Stress(Command):
  91. def run(self, *names, **options):
  92. try:
  93. return Suite(self.app,
  94. block_timeout=options.get('block_timeout'),
  95. ).run(names, **options)
  96. except KeyboardInterrupt:
  97. pass
  98. def get_options(self):
  99. return (
  100. Option('-i', '--iterations', type='int', default=50,
  101. help='Number of iterations for each test'),
  102. Option('-n', '--numtests', type='int', default=None,
  103. help='Number of tests to execute'),
  104. Option('-o', '--offset', type='int', default=0,
  105. help='Start at custom offset'),
  106. Option('--block-timeout', type='int', default=30 * 60),
  107. Option('-l', '--list', action='store_true', dest='list_all',
  108. help='List all tests'),
  109. )
  110. class Suite(object):
  111. def __init__(self, app, block_timeout=30 * 60):
  112. self.app = app
  113. self.connerrors = self.app.connection().recoverable_connection_errors
  114. self.block_timeout = block_timeout
  115. self.tests = OrderedDict(
  116. (fun.__name__, fun) for fun in [
  117. self.manyshort,
  118. self.termbysig,
  119. self.bigtasks,
  120. self.smalltasks,
  121. self.timelimits,
  122. self.timelimits_soft,
  123. self.revoketermfast,
  124. self.revoketermslow,
  125. self.alwayskilled,
  126. ]
  127. )
  128. def run(self, names=None, iterations=50, offset=0,
  129. numtests=None, list_all=False, **kw):
  130. tests = self.filtertests(names)[offset:numtests or None]
  131. if list_all:
  132. return print(self.testlist(tests))
  133. print(self.banner(tests))
  134. marker('Stresstest suite start', '+')
  135. for i, test in enumerate(tests):
  136. self.runtest(test, iterations, i + 1)
  137. marker('Stresstest suite end', '+')
  138. def filtertests(self, names):
  139. try:
  140. return ([self.tests[n] for n in names] if names
  141. else list(values(self.tests)))
  142. except KeyError as exc:
  143. raise KeyError('Unknown test name: {0}'.format(exc))
  144. def testlist(self, tests):
  145. return ',\n'.join(
  146. '.> {0}) {1}'.format(i + 1, t.__name__)
  147. for i, t in enumerate(tests)
  148. )
  149. def banner(self, tests):
  150. app = self.app
  151. return BANNER.format(
  152. app='{0}:0x{1:x}'.format(app.main or '__main__', id(app)),
  153. version=VERSION_BANNER,
  154. conninfo=app.connection().as_uri(),
  155. platform=platform.platform(),
  156. toc=self.testlist(tests),
  157. TESTS=pluralize(len(tests), 'test'),
  158. total=len(tests),
  159. )
  160. def manyshort(self):
  161. self.join(group(add.s(i, i) for i in xrange(1000))())
  162. def runtest(self, fun, n=50, index=0):
  163. with blockdetection(self.block_timeout):
  164. t = time()
  165. i = 0
  166. failed = False
  167. marker('{0}: {1}({2})'.format(index, fun.__name__, n))
  168. try:
  169. for i in range(n):
  170. print(i)
  171. fun()
  172. except Exception:
  173. failed = True
  174. raise
  175. finally:
  176. print('{0} {1} iterations in {2}s'.format(
  177. 'failed after' if failed else 'completed',
  178. i + 1, time() - t,
  179. ))
  180. def termbysig(self):
  181. self._evil_groupmember(kill)
  182. def termbysegfault(self):
  183. self._evil_groupmember(segfault)
  184. def timelimits(self):
  185. self._evil_groupmember(sleeping, 2, timeout=1)
  186. def timelimits_soft(self):
  187. self._evil_groupmember(sleeping_ignore_limits, 2,
  188. soft_timeout=1, timeout=1.1)
  189. def alwayskilled(self):
  190. g = group(kill.s() for _ in range(10))
  191. self.join(g(), timeout=10)
  192. def _evil_groupmember(self, evil_t, *eargs, **opts):
  193. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  194. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  195. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  196. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  197. self.join(g1(), timeout=10)
  198. self.join(g2(), timeout=10)
  199. def bigtasks(self, wait=None):
  200. self._revoketerm(wait, False, False, BIG)
  201. def smalltasks(self, wait=None):
  202. self._revoketerm(wait, False, False, SMALL)
  203. def revoketermfast(self, wait=None):
  204. self._revoketerm(wait, True, False, SMALL)
  205. def revoketermslow(self, wait=5):
  206. self._revoketerm(wait, True, True, BIG)
  207. def _revoketerm(self, wait=None, terminate=True,
  208. joindelay=True, data=BIG):
  209. g = group(any_.s(data, sleep=wait) for i in range(8))
  210. r = g()
  211. if terminate:
  212. if joindelay:
  213. sleep(random.choice(range(4)))
  214. r.revoke(terminate=True)
  215. self.join(r, timeout=100)
  216. def join(self, r, **kwargs):
  217. while 1:
  218. try:
  219. return r.get(propagate=False, **kwargs)
  220. except TimeoutError as exc:
  221. print('join timed out: %s' % (exc, ))
  222. except self.connerrors as exc:
  223. print('join: connection lost: %r' % (exc, ))
  224. if __name__ == '__main__':
  225. Stress(app=app).execute_from_commandline()