test_purge.py 756 B

12345678910111213141516171819202122232425262728
  1. from __future__ import absolute_import, unicode_literals
  2. from case import Mock
  3. from celery.bin.purge import purge
  4. from celery.five import WhateverIO
  5. class test_purge:
  6. def test_run(self):
  7. out = WhateverIO()
  8. a = purge(app=self.app, stdout=out)
  9. a._purge = Mock(name='_purge')
  10. a._purge.return_value = 0
  11. a.run(force=True)
  12. assert 'No messages purged' in out.getvalue()
  13. a._purge.return_value = 100
  14. a.run(force=True)
  15. assert '100 messages' in out.getvalue()
  16. a.out = Mock(name='out')
  17. a.ask = Mock(name='ask')
  18. a.run(force=False)
  19. a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')
  20. a.ask.return_value = 'yes'
  21. a.run(force=False)