test_celeryevdump.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import io
  2. from time import time
  3. from case import Mock, patch
  4. from celery.events.dumper import (
  5. humanize_type,
  6. Dumper,
  7. evdump,
  8. )
  9. class test_Dumper:
  10. def setup(self):
  11. self.out = io.StringIO()
  12. self.dumper = Dumper(out=self.out)
  13. def test_humanize_type(self):
  14. assert humanize_type('worker-offline') == 'shutdown'
  15. assert humanize_type('task-started') == 'task started'
  16. def test_format_task_event(self):
  17. self.dumper.format_task_event(
  18. 'worker@example.com', time(), 'task-started', 'tasks.add', {})
  19. assert self.out.getvalue()
  20. def test_on_event(self):
  21. event = {
  22. 'hostname': 'worker@example.com',
  23. 'timestamp': time(),
  24. 'uuid': '1ef',
  25. 'name': 'tasks.add',
  26. 'args': '(2, 2)',
  27. 'kwargs': '{}',
  28. }
  29. self.dumper.on_event(dict(event, type='task-received'))
  30. assert self.out.getvalue()
  31. self.dumper.on_event(dict(event, type='task-revoked'))
  32. self.dumper.on_event(dict(event, type='worker-online'))
  33. @patch('celery.events.EventReceiver.capture')
  34. def test_evdump(self, capture):
  35. capture.side_effect = KeyboardInterrupt()
  36. evdump(app=self.app)
  37. def test_evdump_error_handler(self):
  38. app = Mock(name='app')
  39. with patch('celery.events.dumper.Dumper') as Dumper:
  40. Dumper.return_value = Mock(name='dumper')
  41. recv = app.events.Receiver.return_value = Mock()
  42. def se(*_a, **_k):
  43. recv.capture.side_effect = SystemExit()
  44. raise KeyError()
  45. recv.capture.side_effect = se
  46. Conn = app.connection_for_read.return_value = Mock(name='conn')
  47. conn = Conn.clone.return_value = Mock(name='cloned_conn')
  48. conn.connection_errors = (KeyError,)
  49. conn.channel_errors = ()
  50. evdump(app)
  51. conn.ensure_connection.assert_called()
  52. errback = conn.ensure_connection.call_args[0][0]
  53. errback(KeyError(), 1)
  54. conn.as_uri.assert_called()