test_celeryevdump.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import absolute_import, unicode_literals
  2. from time import time
  3. from case import Mock, patch
  4. from celery.five import WhateverIO
  5. from celery.events.dumper import (
  6. humanize_type,
  7. Dumper,
  8. evdump,
  9. )
  10. class test_Dumper:
  11. def setup(self):
  12. self.out = WhateverIO()
  13. self.dumper = Dumper(out=self.out)
  14. def test_humanize_type(self):
  15. assert humanize_type('worker-offline') == 'shutdown'
  16. assert humanize_type('task-started') == 'task started'
  17. def test_format_task_event(self):
  18. self.dumper.format_task_event(
  19. 'worker@example.com', time(), 'task-started', 'tasks.add', {})
  20. assert self.out.getvalue()
  21. def test_on_event(self):
  22. event = {
  23. 'hostname': 'worker@example.com',
  24. 'timestamp': time(),
  25. 'uuid': '1ef',
  26. 'name': 'tasks.add',
  27. 'args': '(2, 2)',
  28. 'kwargs': '{}',
  29. }
  30. self.dumper.on_event(dict(event, type='task-received'))
  31. assert self.out.getvalue()
  32. self.dumper.on_event(dict(event, type='task-revoked'))
  33. self.dumper.on_event(dict(event, type='worker-online'))
  34. @patch('celery.events.EventReceiver.capture')
  35. def test_evdump(self, capture):
  36. capture.side_effect = KeyboardInterrupt()
  37. evdump(app=self.app)
  38. def test_evdump_error_handler(self):
  39. app = Mock(name='app')
  40. with patch('celery.events.dumper.Dumper') as Dumper:
  41. Dumper.return_value = Mock(name='dumper')
  42. recv = app.events.Receiver.return_value = Mock()
  43. def se(*_a, **_k):
  44. recv.capture.side_effect = SystemExit()
  45. raise KeyError()
  46. recv.capture.side_effect = se
  47. Conn = app.connection_for_read.return_value = Mock(name='conn')
  48. conn = Conn.clone.return_value = Mock(name='cloned_conn')
  49. conn.connection_errors = (KeyError,)
  50. conn.channel_errors = ()
  51. evdump(app)
  52. conn.ensure_connection.assert_called()
  53. errback = conn.ensure_connection.call_args[0][0]
  54. errback(KeyError(), 1)
  55. conn.as_uri.assert_called()