123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- from __future__ import absolute_import, unicode_literals
- from time import time
- from case import Mock, patch
- from celery.five import WhateverIO
- from celery.events.dumper import (
- humanize_type,
- Dumper,
- evdump,
- )
- class test_Dumper:
- def setup(self):
- self.out = WhateverIO()
- self.dumper = Dumper(out=self.out)
- def test_humanize_type(self):
- assert humanize_type('worker-offline') == 'shutdown'
- assert humanize_type('task-started') == 'task started'
- def test_format_task_event(self):
- self.dumper.format_task_event(
- 'worker@example.com', time(), 'task-started', 'tasks.add', {})
- assert self.out.getvalue()
- def test_on_event(self):
- event = {
- 'hostname': 'worker@example.com',
- 'timestamp': time(),
- 'uuid': '1ef',
- 'name': 'tasks.add',
- 'args': '(2, 2)',
- 'kwargs': '{}',
- }
- self.dumper.on_event(dict(event, type='task-received'))
- assert self.out.getvalue()
- self.dumper.on_event(dict(event, type='task-revoked'))
- self.dumper.on_event(dict(event, type='worker-online'))
- @patch('celery.events.EventReceiver.capture')
- def test_evdump(self, capture):
- capture.side_effect = KeyboardInterrupt()
- evdump(app=self.app)
- def test_evdump_error_handler(self):
- app = Mock(name='app')
- with patch('celery.events.dumper.Dumper') as Dumper:
- Dumper.return_value = Mock(name='dumper')
- recv = app.events.Receiver.return_value = Mock()
- def se(*_a, **_k):
- recv.capture.side_effect = SystemExit()
- raise KeyError()
- recv.capture.side_effect = se
- Conn = app.connection_for_read.return_value = Mock(name='conn')
- conn = Conn.clone.return_value = Mock(name='cloned_conn')
- conn.connection_errors = (KeyError,)
- conn.channel_errors = ()
- evdump(app)
- conn.ensure_connection.assert_called()
- errback = conn.ensure_connection.call_args[0][0]
- errback(KeyError(), 1)
- conn.as_uri.assert_called()
|