test_events.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import absolute_import, unicode_literals
  2. from celery.bin import events
  3. from celery.tests.case import AppCase, patch, _old_patch, skip_unless_module
  4. class MockCommand(object):
  5. executed = []
  6. def execute_from_commandline(self, **kwargs):
  7. self.executed.append(True)
  8. def proctitle(prog, info=None):
  9. proctitle.last = (prog, info)
  10. proctitle.last = ()
  11. class test_events(AppCase):
  12. def setup(self):
  13. self.ev = events.events(app=self.app)
  14. @_old_patch('celery.events.dumper', 'evdump',
  15. lambda **kw: 'me dumper, you?')
  16. @_old_patch('celery.bin.events', 'set_process_title', proctitle)
  17. def test_run_dump(self):
  18. self.assertEqual(self.ev.run(dump=True), 'me dumper, you?')
  19. self.assertIn('celery events:dump', proctitle.last[0])
  20. @skip_unless_module('curses')
  21. def test_run_top(self):
  22. @_old_patch('celery.events.cursesmon', 'evtop',
  23. lambda **kw: 'me top, you?')
  24. @_old_patch('celery.bin.events', 'set_process_title', proctitle)
  25. def _inner():
  26. self.assertEqual(self.ev.run(), 'me top, you?')
  27. self.assertIn('celery events:top', proctitle.last[0])
  28. return _inner()
  29. @_old_patch('celery.events.snapshot', 'evcam',
  30. lambda *a, **k: (a, k))
  31. @_old_patch('celery.bin.events', 'set_process_title', proctitle)
  32. def test_run_cam(self):
  33. a, kw = self.ev.run(camera='foo.bar.baz', logfile='logfile')
  34. self.assertEqual(a[0], 'foo.bar.baz')
  35. self.assertEqual(kw['freq'], 1.0)
  36. self.assertIsNone(kw['maxrate'])
  37. self.assertEqual(kw['loglevel'], 'INFO')
  38. self.assertEqual(kw['logfile'], 'logfile')
  39. self.assertIn('celery events:cam', proctitle.last[0])
  40. @patch('celery.events.snapshot.evcam')
  41. @patch('celery.bin.events.detached')
  42. def test_run_cam_detached(self, detached, evcam):
  43. self.ev.prog_name = 'celery events'
  44. self.ev.run_evcam('myapp.Camera', detach=True)
  45. self.assertTrue(detached.called)
  46. self.assertTrue(evcam.called)
  47. def test_get_options(self):
  48. self.assertFalse(self.ev.get_options())
  49. @_old_patch('celery.bin.events', 'events', MockCommand)
  50. def test_main(self):
  51. MockCommand.executed = []
  52. events.main()
  53. self.assertTrue(MockCommand.executed)