testbuf.py 490 B

123456789101112131415161718192021222324
  1. from __future__ import absolute_import
  2. import sys
  3. import time
  4. from celery.result import ResultSet
  5. from stress.app import app, sleeping
  6. def testbuf(padbytes=0, megabytes=0):
  7. padding = float(padbytes) + 2 ** 20 * float(megabytes)
  8. results = []
  9. print('> padding: %r' % (padding, ))
  10. for i in range(8 * 4):
  11. results.append(sleeping.delay(1, kw='x' * int(padding)))
  12. time.sleep(0.01)
  13. res = ResultSet(results)
  14. print(res.join())
  15. testbuf(*sys.argv[1:])