test_celeryevdump.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import absolute_import, unicode_literals
  2. from time import time
  3. from case import Mock, patch
  4. from celery.events.dumper import Dumper, evdump, humanize_type
  5. from celery.five import WhateverIO
  6. class test_Dumper:
  7. def setup(self):
  8. self.out = WhateverIO()
  9. self.dumper = Dumper(out=self.out)
  10. def test_humanize_type(self):
  11. assert humanize_type('worker-offline') == 'shutdown'
  12. assert humanize_type('task-started') == 'task started'
  13. def test_format_task_event(self):
  14. self.dumper.format_task_event(
  15. 'worker@example.com', time(), 'task-started', 'tasks.add', {})
  16. assert self.out.getvalue()
  17. def test_on_event(self):
  18. event = {
  19. 'hostname': 'worker@example.com',
  20. 'timestamp': time(),
  21. 'uuid': '1ef',
  22. 'name': 'tasks.add',
  23. 'args': '(2, 2)',
  24. 'kwargs': '{}',
  25. }
  26. self.dumper.on_event(dict(event, type='task-received'))
  27. assert self.out.getvalue()
  28. self.dumper.on_event(dict(event, type='task-revoked'))
  29. self.dumper.on_event(dict(event, type='worker-online'))
  30. @patch('celery.events.EventReceiver.capture')
  31. def test_evdump(self, capture):
  32. capture.side_effect = KeyboardInterrupt()
  33. evdump(app=self.app)
  34. def test_evdump_error_handler(self):
  35. app = Mock(name='app')
  36. with patch('celery.events.dumper.Dumper') as Dumper:
  37. Dumper.return_value = Mock(name='dumper')
  38. recv = app.events.Receiver.return_value = Mock()
  39. def se(*_a, **_k):
  40. recv.capture.side_effect = SystemExit()
  41. raise KeyError()
  42. recv.capture.side_effect = se
  43. Conn = app.connection_for_read.return_value = Mock(name='conn')
  44. conn = Conn.clone.return_value = Mock(name='cloned_conn')
  45. conn.connection_errors = (KeyError,)
  46. conn.channel_errors = ()
  47. evdump(app)
  48. conn.ensure_connection.assert_called()
  49. errback = conn.ensure_connection.call_args[0][0]
  50. errback(KeyError(), 1)
  51. conn.as_uri.assert_called()