test_celeryevdump.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import absolute_import
  2. from mock import Mock, patch
  3. from time import time
  4. from celery.events.dumper import (
  5. humanize_type,
  6. Dumper,
  7. evdump,
  8. )
  9. from celery.tests.case import Case, WhateverIO
  10. class test_Dumper(Case):
  11. def setUp(self):
  12. self.out = WhateverIO()
  13. self.dumper = Dumper(out=self.out)
  14. def test_humanize_type(self):
  15. self.assertEqual(humanize_type('worker-offline'), 'shutdown')
  16. self.assertEqual(humanize_type('task-started'), 'task started')
  17. def test_format_task_event(self):
  18. self.dumper.format_task_event(
  19. 'worker.example.com', time(), 'task-started', 'tasks.add', {})
  20. self.assertTrue(self.out.getvalue())
  21. def test_on_event(self):
  22. event = {
  23. 'hostname': 'worker.example.com',
  24. 'timestamp': time(),
  25. 'uuid': '1ef',
  26. 'name': 'tasks.add',
  27. 'args': '(2, 2)',
  28. 'kwargs': '{}',
  29. }
  30. self.dumper.on_event(dict(event, type='task-received'))
  31. self.assertTrue(self.out.getvalue())
  32. self.dumper.on_event(dict(event, type='task-revoked'))
  33. self.dumper.on_event(dict(event, type='worker-online'))
  34. @patch('celery.events.EventReceiver.capture')
  35. def test_evdump(self, capture):
  36. capture.side_effect = KeyboardInterrupt()
  37. evdump()
  38. def test_evdump_error_handler(self):
  39. app = Mock(name='app')
  40. with patch('celery.events.dumper.Dumper') as Dumper:
  41. Dumper.return_value = Mock(name='dumper')
  42. recv = app.events.Receiver.return_value = Mock()
  43. def se(*_a, **_k):
  44. recv.capture.side_effect = SystemExit()
  45. raise KeyError()
  46. recv.capture.side_effect = se
  47. Conn = app.connection.return_value = Mock(name='conn')
  48. conn = Conn.clone.return_value = Mock(name='cloned_conn')
  49. conn.connection_errors = (KeyError, )
  50. conn.channel_errors = ()
  51. evdump(app)
  52. self.assertTrue(conn.ensure_connection.called)
  53. errback = conn.ensure_connection.call_args[0][0]
  54. errback(KeyError(), 1)
  55. self.assertTrue(conn.as_uri.called)