123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from __future__ import absolute_import
- from mock import Mock, patch
- from time import time
- from celery.events.dumper import (
- humanize_type,
- Dumper,
- evdump,
- )
- from celery.tests.case import Case, WhateverIO
- class test_Dumper(Case):
- def setUp(self):
- self.out = WhateverIO()
- self.dumper = Dumper(out=self.out)
- def test_humanize_type(self):
- self.assertEqual(humanize_type('worker-offline'), 'shutdown')
- self.assertEqual(humanize_type('task-started'), 'task started')
- def test_format_task_event(self):
- self.dumper.format_task_event(
- 'worker.example.com', time(), 'task-started', 'tasks.add', {})
- self.assertTrue(self.out.getvalue())
- def test_on_event(self):
- event = {
- 'hostname': 'worker.example.com',
- 'timestamp': time(),
- 'uuid': '1ef',
- 'name': 'tasks.add',
- 'args': '(2, 2)',
- 'kwargs': '{}',
- }
- self.dumper.on_event(dict(event, type='task-received'))
- self.assertTrue(self.out.getvalue())
- self.dumper.on_event(dict(event, type='task-revoked'))
- self.dumper.on_event(dict(event, type='worker-online'))
- @patch('celery.events.EventReceiver.capture')
- def test_evdump(self, capture):
- capture.side_effect = KeyboardInterrupt()
- evdump()
- def test_evdump_error_handler(self):
- app = Mock(name='app')
- with patch('celery.events.dumper.Dumper') as Dumper:
- Dumper.return_value = Mock(name='dumper')
- recv = app.events.Receiver.return_value = Mock()
- def se(*_a, **_k):
- recv.capture.side_effect = SystemExit()
- raise KeyError()
- recv.capture.side_effect = se
- Conn = app.connection.return_value = Mock(name='conn')
- conn = Conn.clone.return_value = Mock(name='cloned_conn')
- conn.connection_errors = (KeyError, )
- conn.channel_errors = ()
- evdump(app)
- self.assertTrue(conn.ensure_connection.called)
- errback = conn.ensure_connection.call_args[0][0]
- errback(KeyError(), 1)
- self.assertTrue(conn.as_uri.called)
|