__main__.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. from celery.bin.base import Command, Option
  3. from .app import app
  4. from .suite import Suite
  5. class Stress(Command):
  6. def run(self, *names, **options):
  7. try:
  8. return Suite(
  9. self.app,
  10. block_timeout=options.get('block_timeout'),
  11. ).run(names, **options)
  12. except KeyboardInterrupt:
  13. pass
  14. def get_options(self):
  15. return (
  16. Option('-i', '--iterations', type='int', default=50,
  17. help='Number of iterations for each test'),
  18. Option('-n', '--numtests', type='int', default=None,
  19. help='Number of tests to execute'),
  20. Option('-o', '--offset', type='int', default=0,
  21. help='Start at custom offset'),
  22. Option('--block-timeout', type='int', default=30 * 60),
  23. Option('-l', '--list', action='store_true', dest='list_all',
  24. help='List all tests'),
  25. Option('-r', '--repeat', type='float', default=0,
  26. help='Number of times to repeat the test suite'),
  27. Option('-g', '--group', default='all',
  28. help='Specify test group (all|green)'),
  29. Option('--diag', default=False, action='store_true',
  30. help='Enable diagnostics (slow)'),
  31. Option('-J', '--no-join', default=False, action='store_true',
  32. help='Do not wait for task results'),
  33. )
  34. if __name__ == '__main__':
  35. Stress(app=app).execute_from_commandline()