__main__.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from __future__ import absolute_import, print_function
  2. from celery.bin.base import Command, Option
  3. from .app import app
  4. from .suite import Suite
  5. class Stress(Command):
  6. def run(self, *names, **options):
  7. try:
  8. return Suite(
  9. self.app,
  10. block_timeout=options.get('block_timeout'),
  11. ).run(names, **options)
  12. except KeyboardInterrupt:
  13. pass
  14. def get_options(self):
  15. return (
  16. Option('-i', '--iterations', type='int', default=50,
  17. help='Number of iterations for each test'),
  18. Option('-n', '--numtests', type='int', default=None,
  19. help='Number of tests to execute'),
  20. Option('-o', '--offset', type='int', default=0,
  21. help='Start at custom offset'),
  22. Option('--block-timeout', type='int', default=30 * 60),
  23. Option('-l', '--list', action='store_true', dest='list_all',
  24. help='List all tests'),
  25. Option('-r', '--repeat', type='float', default=0,
  26. help='Number of times to repeat the test suite'),
  27. )
  28. if __name__ == '__main__':
  29. Stress(app=app).execute_from_commandline()