stress.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. from __future__ import absolute_import
  2. import random
  3. import os
  4. import signal
  5. import sys
  6. from time import time, sleep
  7. from celery import Celery, group
  8. from celery.exceptions import TimeoutError, SoftTimeLimitExceeded
  9. from celery.five import range
  10. from celery.utils.debug import blockdetection
  11. # Should be run with workers running using these options:
  12. #
  13. # 1) celery -A stress worker -c 1 --maxtasksperchild=1
  14. # 2) celery -A stress worker -c 8 --maxtasksperchild=1
  15. #
  16. # 3) celery -A stress worker -c 1
  17. # 4) celery -A stress worker -c 8
  18. #
  19. # 5) celery -A stress worker --autoscale=8,0
  20. #
  21. # 6) celery -A stress worker --time-limit=1
  22. #
  23. # 7) celery -A stress worker -c1 --maxtasksperchild=1 -- celery.acks_late=1
  24. BIG = 'x' * 2 ** 20 * 8
  25. SMALL = 'e' * 1024
  26. celery = Celery(
  27. 'stress', broker='pyamqp://', backend='redis://',
  28. set_as_current=False,
  29. )
  30. celery.conf.update(
  31. CELERYD_PREFETCH_MULTIPLIER=1,
  32. )
  33. @celery.task
  34. def _marker(s, sep='-'):
  35. print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
  36. @celery.task
  37. def add(x, y):
  38. return x + y
  39. @celery.task
  40. def any_(*args, **kwargs):
  41. wait = kwargs.get('sleep')
  42. if wait:
  43. sleep(wait)
  44. @celery.task
  45. def exiting(status=0):
  46. sys.exit(status)
  47. @celery.task
  48. def kill(sig=signal.SIGKILL):
  49. os.kill(os.getpid(), sig)
  50. @celery.task
  51. def sleeping(i):
  52. sleep(i)
  53. @celery.task
  54. def sleeping_ignore_limits(i):
  55. try:
  56. sleep(i)
  57. except SoftTimeLimitExceeded:
  58. sleep(i)
  59. @celery.task
  60. def segfault():
  61. import ctypes
  62. ctypes.memset(0, 0, 1)
  63. assert False, 'should not get here'
  64. def marker(s, sep='-'):
  65. print('{0}{1}'.format(sep, s))
  66. _marker.delay(s, sep)
  67. class Stresstests(object):
  68. def __init__(self, app, block_timeout=30 * 60):
  69. self.app = app
  70. self.connerrors = self.app.connection().recoverable_connection_errors
  71. self.block_timeout = block_timeout
  72. def run(self, n=50):
  73. marker('Stresstest suite start', '+')
  74. tests = [self.manyshort,
  75. self.termbysig,
  76. self.bigtasks,
  77. self.smalltasks,
  78. self.timelimits,
  79. self.timelimits_soft,
  80. self.revoketermfast,
  81. self.revoketermslow]
  82. for test in tests:
  83. self.runtest(test, n)
  84. marker('Stresstest suite end', '+')
  85. def manyshort(self):
  86. self.join(group(add.s(i, i) for i in xrange(1000))())
  87. def runtest(self, fun, n=50):
  88. with blockdetection(self.block_timeout):
  89. t = time()
  90. i = 0
  91. failed = False
  92. marker('{0}({1})'.format(fun.__name__, n))
  93. try:
  94. for i in range(n):
  95. print(i)
  96. fun()
  97. except Exception:
  98. failed = True
  99. raise
  100. finally:
  101. print('{0} {1} iterations in {2}s'.format(
  102. 'failed after' if failed else 'completed',
  103. i + 1, time() - t,
  104. ))
  105. def termbysig(self):
  106. self._evil_groupmember(kill)
  107. def termbysegfault(self):
  108. self._evil_groupmember(segfault)
  109. def timelimits(self):
  110. self._evil_groupmember(sleeping, 2, timeout=1)
  111. def timelimits_soft(self):
  112. self._evil_groupmember(sleeping_ignore_limits, 2,
  113. soft_timeout=1, timeout=1.1)
  114. def _evil_groupmember(self, evil_t, *eargs, **opts):
  115. g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
  116. add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
  117. g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
  118. evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
  119. self.join(g1(), timeout=10)
  120. self.join(g2(), timeout=10)
  121. def bigtasks(self, wait=None):
  122. self._revoketerm(wait, False, False, BIG)
  123. def smalltasks(self, wait=None):
  124. self._revoketerm(wait, False, False, SMALL)
  125. def revoketermfast(self, wait=None):
  126. self._revoketerm(wait, True, False, SMALL)
  127. def revoketermslow(self, wait=5):
  128. self._revoketerm(wait, True, True, BIG)
  129. def _revoketerm(self, wait=None, terminate=True,
  130. joindelay=True, data=BIG):
  131. g = group(any_.s(data, sleep=wait) for i in range(8))
  132. r = g()
  133. if terminate:
  134. if joindelay:
  135. sleep(random.choice(range(4)))
  136. r.revoke(terminate=True)
  137. self.join(r, timeout=100)
  138. def join(self, r, **kwargs):
  139. while 1:
  140. try:
  141. return r.get(propagate=False, **kwargs)
  142. except TimeoutError as exc:
  143. print('join timed out: %s' % (exc, ))
  144. except self.connerrors as exc:
  145. print('join: connection lost: %r' % (exc, ))
  146. if __name__ == '__main__':
  147. s = Stresstests(celery)
  148. s.run()