123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- from __future__ import absolute_import
- import os
- from itertools import count
- from celery.concurrency.base import apply_target, BasePool
- from celery.exceptions import WorkerShutdown, WorkerTerminate
- from celery.tests.case import AppCase, Mock, patch
- class test_BasePool(AppCase):
- def test_apply_target(self):
- scratch = {}
- counter = count(0)
- def gen_callback(name, retval=None):
- def callback(*args):
- scratch[name] = (next(counter), args)
- return retval
- return callback
- apply_target(gen_callback('target', 42),
- args=(8, 16),
- callback=gen_callback('callback'),
- accept_callback=gen_callback('accept_callback'))
- self.assertDictContainsSubset(
- {'target': (1, (8, 16)), 'callback': (2, (42,))},
- scratch,
- )
- pa1 = scratch['accept_callback']
- self.assertEqual(0, pa1[0])
- self.assertEqual(pa1[1][0], os.getpid())
- self.assertTrue(pa1[1][1])
- # No accept callback
- scratch.clear()
- apply_target(gen_callback('target', 42),
- args=(8, 16),
- callback=gen_callback('callback'),
- accept_callback=None)
- self.assertDictEqual(scratch,
- {'target': (3, (8, 16)),
- 'callback': (4, (42,))})
- def test_apply_target__propagate(self):
- target = Mock(name='target')
- target.side_effect = KeyError()
- with self.assertRaises(KeyError):
- apply_target(target, propagate=(KeyError,))
- def test_apply_target__raises(self):
- target = Mock(name='target')
- target.side_effect = KeyError()
- with self.assertRaises(KeyError):
- apply_target(target)
- def test_apply_target__raises_WorkerShutdown(self):
- target = Mock(name='target')
- target.side_effect = WorkerShutdown()
- with self.assertRaises(WorkerShutdown):
- apply_target(target)
- def test_apply_target__raises_WorkerTerminate(self):
- target = Mock(name='target')
- target.side_effect = WorkerTerminate()
- with self.assertRaises(WorkerTerminate):
- apply_target(target)
- def test_apply_target__raises_BaseException(self):
- target = Mock(name='target')
- callback = Mock(name='callback')
- target.side_effect = BaseException()
- apply_target(target, callback=callback)
- self.assertTrue(callback.called)
- @patch('celery.concurrency.base.reraise')
- def test_apply_target__raises_BaseException_raises_else(self, reraise):
- target = Mock(name='target')
- callback = Mock(name='callback')
- reraise.side_effect = KeyError()
- target.side_effect = BaseException()
- with self.assertRaises(KeyError):
- apply_target(target, callback=callback)
- self.assertFalse(callback.called)
- def test_does_not_debug(self):
- x = BasePool(10)
- x._does_debug = False
- x.apply_async(object)
- def test_num_processes(self):
- self.assertEqual(BasePool(7).num_processes, 7)
- def test_interface_on_start(self):
- BasePool(10).on_start()
- def test_interface_on_stop(self):
- BasePool(10).on_stop()
- def test_interface_on_apply(self):
- BasePool(10).on_apply()
- def test_interface_info(self):
- self.assertDictEqual(BasePool(10).info, {})
- def test_interface_flush(self):
- self.assertIsNone(BasePool(10).flush())
- def test_active(self):
- p = BasePool(10)
- self.assertFalse(p.active)
- p._state = p.RUN
- self.assertTrue(p.active)
- def test_restart(self):
- p = BasePool(10)
- with self.assertRaises(NotImplementedError):
- p.restart()
- def test_interface_on_terminate(self):
- p = BasePool(10)
- p.on_terminate()
- def test_interface_terminate_job(self):
- with self.assertRaises(NotImplementedError):
- BasePool(10).terminate_job(101)
- def test_interface_did_start_ok(self):
- self.assertTrue(BasePool(10).did_start_ok())
- def test_interface_register_with_event_loop(self):
- self.assertIsNone(
- BasePool(10).register_with_event_loop(Mock()),
- )
- def test_interface_on_soft_timeout(self):
- self.assertIsNone(BasePool(10).on_soft_timeout(Mock()))
- def test_interface_on_hard_timeout(self):
- self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))
- def test_interface_close(self):
- p = BasePool(10)
- p.on_close = Mock()
- p.close()
- self.assertEqual(p._state, p.CLOSE)
- p.on_close.assert_called_with()
- def test_interface_no_close(self):
- self.assertIsNone(BasePool(10).on_close())
|