| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 | from __future__ import absolute_import, unicode_literalsfrom time import timefrom case import Mock, patchfrom celery.five import WhateverIOfrom celery.events.dumper import (    humanize_type,    Dumper,    evdump,)class test_Dumper:    def setup(self):        self.out = WhateverIO()        self.dumper = Dumper(out=self.out)    def test_humanize_type(self):        assert humanize_type('worker-offline') == 'shutdown'        assert humanize_type('task-started') == 'task started'    def test_format_task_event(self):        self.dumper.format_task_event(            'worker@example.com', time(), 'task-started', 'tasks.add', {})        assert self.out.getvalue()    def test_on_event(self):        event = {            'hostname': 'worker@example.com',            'timestamp': time(),            'uuid': '1ef',            'name': 'tasks.add',            'args': '(2, 2)',            'kwargs': '{}',        }        self.dumper.on_event(dict(event, type='task-received'))        assert self.out.getvalue()        self.dumper.on_event(dict(event, type='task-revoked'))        self.dumper.on_event(dict(event, type='worker-online'))    @patch('celery.events.EventReceiver.capture')    def test_evdump(self, capture):        capture.side_effect = KeyboardInterrupt()        evdump(app=self.app)    def test_evdump_error_handler(self):        app = Mock(name='app')        with patch('celery.events.dumper.Dumper') as Dumper:            Dumper.return_value = Mock(name='dumper')            recv = app.events.Receiver.return_value = Mock()            def se(*_a, **_k):                recv.capture.side_effect = SystemExit()                raise KeyError()            recv.capture.side_effect = se            Conn = app.connection_for_read.return_value = Mock(name='conn')            conn = Conn.clone.return_value = Mock(name='cloned_conn')            conn.connection_errors = (KeyError,)            conn.channel_errors = ()            evdump(app)            conn.ensure_connection.assert_called()            errback = conn.ensure_connection.call_args[0][0]            errback(KeyError(), 1)            conn.as_uri.assert_called()
 |