| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686 | from __future__ import absolute_import, unicode_literalsimport socketimport sysfrom collections import defaultdictfrom datetime import datetime, timedeltaimport pytestfrom case import Mock, call, patchfrom kombu import pidboxfrom kombu.utils.uuid import uuidfrom celery.five import Queue as FastQueuefrom celery.utils.collections import AttributeDictfrom celery.utils.timer2 import Timerfrom celery.worker import WorkController as _WC  # noqafrom celery.worker import consumer, controlfrom celery.worker import state as worker_statefrom celery.worker.pidbox import Pidbox, gPidboxfrom celery.worker.request import Requestfrom celery.worker.state import revokedhostname = socket.gethostname()class WorkController(object):    autoscaler = None    def stats(self):        return {'total': worker_state.total_count}class Consumer(consumer.Consumer):    def __init__(self, app):        self.app = app        self.buffer = FastQueue()        self.timer = Timer()        self.event_dispatcher = Mock()        self.controller = WorkController()        self.task_consumer = Mock()        self.prefetch_multiplier = 1        self.initial_prefetch_count = 1        from celery.concurrency.base import BasePool        self.pool = BasePool(10)        self.task_buckets = defaultdict(lambda: None)        self.hub = None    def call_soon(self, p, *args, **kwargs):        return p(*args, **kwargs)class test_Pidbox:    def test_shutdown(self):        with patch('celery.worker.pidbox.ignore_errors') as eig:            parent = Mock()            pbox = Pidbox(parent)            pbox._close_channel = Mock()            assert pbox.c is parent            pconsumer = pbox.consumer = Mock()            cancel = pconsumer.cancel            pbox.shutdown(parent)            eig.assert_called_with(parent, cancel)            pbox._close_channel.assert_called_with(parent)class test_Pidbox_green:    def test_stop(self):        parent = Mock()        g = gPidbox(parent)        stopped = g._node_stopped = Mock()        shutdown = g._node_shutdown = Mock()        close_chan = g._close_channel = Mock()        g.stop(parent)        shutdown.set.assert_called_with()        stopped.wait.assert_called_with()        close_chan.assert_called_with(parent)        assert g._node_stopped is None        assert g._node_shutdown is None        close_chan.reset()        g.stop(parent)        close_chan.assert_called_with(parent)    def test_resets(self):        parent = Mock()        g = gPidbox(parent)        g._resets = 100        g.reset()        assert g._resets == 101    def test_loop(self):        parent = Mock()        conn = self.app.connection_for_read()        parent.connection_for_read.return_value = conn        drain = conn.drain_events = Mock()        g = gPidbox(parent)        parent.connection = Mock()        do_reset = g._do_reset = Mock()        call_count = [0]        def se(*args, **kwargs):            if call_count[0] > 2:                g._node_shutdown.set()            g.reset()            call_count[0] += 1        drain.side_effect = se        g.loop(parent)        assert do_reset.call_count == 4class test_ControlPanel:    def setup(self):        self.panel = self.create_panel(consumer=Consumer(self.app))        @self.app.task(name='c.unittest.mytask', rate_limit=200, shared=False)        def mytask():            pass        self.mytask = mytask    def create_state(self, **kwargs):        kwargs.setdefault('app', self.app)        kwargs.setdefault('hostname', hostname)        kwargs.setdefault('tset', set)        return AttributeDict(kwargs)    def create_panel(self, **kwargs):        return self.app.control.mailbox.Node(            hostname=hostname,            state=self.create_state(**kwargs),            handlers=control.Panel.data,        )    def test_enable_events(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        evd = consumer.event_dispatcher        evd.groups = set()        panel.handle('enable_events')        assert not evd.groups        evd.groups = {'worker'}        panel.handle('enable_events')        assert 'task' in evd.groups        evd.groups = {'task'}        assert 'already enabled' in panel.handle('enable_events')['ok']    def test_disable_events(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        evd = consumer.event_dispatcher        evd.enabled = True        evd.groups = {'task'}        panel.handle('disable_events')        assert 'task' not in evd.groups        assert 'already disabled' in panel.handle('disable_events')['ok']    def test_clock(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        panel.state.app.clock.value = 313        x = panel.handle('clock')        assert x['clock'] == 313    def test_hello(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        panel.state.app.clock.value = 313        panel.state.hostname = 'elaine@vandelay.com'        worker_state.revoked.add('revoked1')        try:            assert panel.handle('hello', {                'from_node': 'elaine@vandelay.com',            }) is None            x = panel.handle('hello', {                'from_node': 'george@vandelay.com',            })            assert x['clock'] == 314  # incremented            x = panel.handle('hello', {                'from_node': 'george@vandelay.com',                'revoked': {'1234', '4567', '891'}            })            assert 'revoked1' in x['revoked']            assert '1234' in x['revoked']            assert '4567' in x['revoked']            assert '891' in x['revoked']            assert x['clock'] == 315  # incremented        finally:            worker_state.revoked.discard('revoked1')    def test_conf(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        panel.app = self.app        panel.app.finalize()        self.app.conf.some_key6 = 'hello world'        x = panel.handle('dump_conf')        assert 'some_key6' in x    def test_election(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        consumer.gossip = Mock()        panel.handle(            'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},        )        consumer.gossip.election.assert_called_with('id', 'topic', 'action')    def test_election__no_gossip(self):        consumer = Mock(name='consumer')        consumer.gossip = None        panel = self.create_panel(consumer=consumer)        panel.handle(            'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},        )    def test_heartbeat(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        event_dispatcher = consumer.event_dispatcher        event_dispatcher.enabled = True        panel.handle('heartbeat')        assert ('worker-heartbeat',) in event_dispatcher.send.call_args    def test_time_limit(self):        panel = self.create_panel(consumer=Mock())        r = panel.handle('time_limit', arguments={            'task_name': self.mytask.name, 'hard': 30, 'soft': 10})        assert self.mytask.time_limit == 30        assert self.mytask.soft_time_limit == 10        assert 'ok' in r        r = panel.handle('time_limit', arguments={            'task_name': self.mytask.name, 'hard': None, 'soft': None})        assert self.mytask.time_limit is None        assert self.mytask.soft_time_limit is None        assert 'ok' in r        r = panel.handle('time_limit', arguments={            'task_name': '248e8afya9s8dh921eh928', 'hard': 30})        assert 'error' in r    def test_active_queues(self):        import kombu        x = kombu.Consumer(self.app.connection_for_read(),                           [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),                            kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],                           auto_declare=False)        consumer = Mock()        consumer.task_consumer = x        panel = self.create_panel(consumer=consumer)        r = panel.handle('active_queues')        assert list(sorted(q['name'] for q in r)) == ['bar', 'foo']    def test_active_queues__empty(self):        consumer = Mock(name='consumer')        panel = self.create_panel(consumer=consumer)        consumer.task_consumer = None        assert not panel.handle('active_queues')    def test_dump_tasks(self):        info = '\n'.join(self.panel.handle('dump_tasks'))        assert 'mytask' in info        assert 'rate_limit=200' in info    def test_dump_tasks2(self):        prev, control.DEFAULT_TASK_INFO_ITEMS = (            control.DEFAULT_TASK_INFO_ITEMS, [])        try:            info = '\n'.join(self.panel.handle('dump_tasks'))            assert 'mytask' in info            assert 'rate_limit=200' not in info        finally:            control.DEFAULT_TASK_INFO_ITEMS = prev    def test_stats(self):        prev_count, worker_state.total_count = worker_state.total_count, 100        try:            assert self.panel.handle('stats')['total'] == 100        finally:            worker_state.total_count = prev_count    def test_report(self):        self.panel.handle('report')    def test_active(self):        r = Request(            self.TaskMessage(self.mytask.name, 'do re mi'),            app=self.app,        )        worker_state.active_requests.add(r)        try:            assert self.panel.handle('dump_active')        finally:            worker_state.active_requests.discard(r)    def test_pool_grow(self):        class MockPool(object):            def __init__(self, size=1):                self.size = size            def grow(self, n=1):                self.size += n            def shrink(self, n=1):                self.size -= n            @property            def num_processes(self):                return self.size        consumer = Consumer(self.app)        consumer.prefetch_multiplier = 8        consumer.qos = Mock(name='qos')        consumer.pool = MockPool(1)        panel = self.create_panel(consumer=consumer)        panel.handle('pool_grow')        assert consumer.pool.size == 2        consumer.qos.increment_eventually.assert_called_with(8)        assert consumer.initial_prefetch_count == 16        panel.handle('pool_shrink')        assert consumer.pool.size == 1        consumer.qos.decrement_eventually.assert_called_with(8)        assert consumer.initial_prefetch_count == 8        panel.state.consumer = Mock()        panel.state.consumer.controller = Mock()        sc = panel.state.consumer.controller.autoscaler = Mock()        panel.handle('pool_grow')        sc.force_scale_up.assert_called()        panel.handle('pool_shrink')        sc.force_scale_down.assert_called()    def test_add__cancel_consumer(self):        class MockConsumer(object):            queues = []            canceled = []            consuming = False            hub = Mock(name='hub')            def add_queue(self, queue):                self.queues.append(queue.name)            def consume(self):                self.consuming = True            def cancel_by_queue(self, queue):                self.canceled.append(queue)            def consuming_from(self, queue):                return queue in self.queues        consumer = Consumer(self.app)        consumer.task_consumer = MockConsumer()        panel = self.create_panel(consumer=consumer)        panel.handle('add_consumer', {'queue': 'MyQueue'})        assert 'MyQueue' in consumer.task_consumer.queues        assert consumer.task_consumer.consuming        panel.handle('add_consumer', {'queue': 'MyQueue'})        panel.handle('cancel_consumer', {'queue': 'MyQueue'})        assert 'MyQueue' in consumer.task_consumer.canceled    def test_revoked(self):        worker_state.revoked.clear()        worker_state.revoked.add('a1')        worker_state.revoked.add('a2')        try:            assert sorted(self.panel.handle('dump_revoked')) == ['a1', 'a2']        finally:            worker_state.revoked.clear()    def test_dump_schedule(self):        consumer = Consumer(self.app)        panel = self.create_panel(consumer=consumer)        assert not panel.handle('dump_schedule')        r = Request(            self.TaskMessage(self.mytask.name, 'CAFEBABE'),            app=self.app,        )        consumer.timer.schedule.enter_at(            consumer.timer.Entry(lambda x: x, (r,)),            datetime.now() + timedelta(seconds=10))        consumer.timer.schedule.enter_at(            consumer.timer.Entry(lambda x: x, (object(),)),            datetime.now() + timedelta(seconds=10))        assert panel.handle('dump_schedule')    def test_dump_reserved(self):        consumer = Consumer(self.app)        req = Request(            self.TaskMessage(self.mytask.name, args=(2, 2)), app=self.app,        )  # ^ need to keep reference for reserved_tasks WeakSet.        worker_state.task_reserved(req)        try:            panel = self.create_panel(consumer=consumer)            response = panel.handle('dump_reserved', {'safe': True})            assert response[0]['name'] == self.mytask.name            assert response[0]['hostname'] == socket.gethostname()            worker_state.reserved_requests.clear()            assert not panel.handle('dump_reserved')        finally:            worker_state.reserved_requests.clear()    def test_rate_limit_invalid_rate_limit_string(self):        e = self.panel.handle('rate_limit', arguments={            'task_name': 'tasks.add', 'rate_limit': 'x1240301#%!'})        assert 'Invalid rate limit string' in e.get('error')    def test_rate_limit(self):        class xConsumer(object):            reset = False            def reset_rate_limits(self):                self.reset = True        consumer = xConsumer()        panel = self.create_panel(app=self.app, consumer=consumer)        task = self.app.tasks[self.mytask.name]        panel.handle('rate_limit', arguments={'task_name': task.name,                                              'rate_limit': '100/m'})        assert task.rate_limit == '100/m'        assert consumer.reset        consumer.reset = False        panel.handle('rate_limit', arguments={            'task_name': task.name,            'rate_limit': 0,        })        assert task.rate_limit == 0        assert consumer.reset    def test_rate_limit_nonexistant_task(self):        self.panel.handle('rate_limit', arguments={            'task_name': 'xxxx.does.not.exist',            'rate_limit': '1000/s'})    def test_unexposed_command(self):        with pytest.raises(KeyError):            self.panel.handle('foo', arguments={})    def test_revoke_with_name(self):        tid = uuid()        m = {            'method': 'revoke',            'destination': hostname,            'arguments': {                'task_id': tid,                'task_name': self.mytask.name,            },        }        self.panel.handle_message(m, None)        assert tid in revoked    def test_revoke_with_name_not_in_registry(self):        tid = uuid()        m = {            'method': 'revoke',            'destination': hostname,            'arguments': {                'task_id': tid,                'task_name': 'xxxxxxxxx33333333388888',            },        }        self.panel.handle_message(m, None)        assert tid in revoked    def test_revoke(self):        tid = uuid()        m = {            'method': 'revoke',            'destination': hostname,            'arguments': {                'task_id': tid,            },        }        self.panel.handle_message(m, None)        assert tid in revoked        m = {            'method': 'revoke',            'destination': 'does.not.exist',            'arguments': {                'task_id': tid + 'xxx',            },        }        self.panel.handle_message(m, None)        assert tid + 'xxx' not in revoked    def test_revoke_terminate(self):        request = Mock()        request.id = tid = uuid()        state = self.create_state()        state.consumer = Mock()        worker_state.task_reserved(request)        try:            r = control.revoke(state, tid, terminate=True)            assert tid in revoked            assert request.terminate.call_count            assert 'terminate:' in r['ok']            # unknown task id only revokes            r = control.revoke(state, uuid(), terminate=True)            assert 'tasks unknown' in r['ok']        finally:            worker_state.task_ready(request)    def test_autoscale(self):        self.panel.state.consumer = Mock()        self.panel.state.consumer.controller = Mock()        sc = self.panel.state.consumer.controller.autoscaler = Mock()        sc.update.return_value = 10, 2        m = {'method': 'autoscale',             'destination': hostname,             'arguments': {'max': '10', 'min': '2'}}        r = self.panel.handle_message(m, None)        assert 'ok' in r        self.panel.state.consumer.controller.autoscaler = None        r = self.panel.handle_message(m, None)        assert 'error' in r    def test_ping(self):        m = {'method': 'ping',             'destination': hostname}        r = self.panel.handle_message(m, None)        assert r == {'ok': 'pong'}    def test_shutdown(self):        m = {'method': 'shutdown',             'destination': hostname}        with pytest.raises(SystemExit):            self.panel.handle_message(m, None)    def test_panel_reply(self):        replies = []        class _Node(pidbox.Node):            def reply(self, data, exchange, routing_key, **kwargs):                replies.append(data)        panel = _Node(            hostname=hostname,            state=self.create_state(consumer=Consumer(self.app)),            handlers=control.Panel.data,            mailbox=self.app.control.mailbox,        )        r = panel.dispatch('ping', reply_to={            'exchange': 'x',            'routing_key': 'x',        })        assert r == {'ok': 'pong'}        assert replies[0] == {panel.hostname: {'ok': 'pong'}}    def test_pool_restart(self):        consumer = Consumer(self.app)        consumer.controller = _WC(app=self.app)        consumer.controller.consumer = consumer        consumer.controller.pool.restart = Mock()        consumer.reset_rate_limits = Mock(name='reset_rate_limits()')        consumer.update_strategies = Mock(name='update_strategies()')        consumer.event_dispatcher = Mock(name='evd')        panel = self.create_panel(consumer=consumer)        assert panel.state.consumer.controller.consumer is consumer        panel.app = self.app        _import = panel.app.loader.import_from_cwd = Mock()        _reload = Mock()        with pytest.raises(ValueError):            panel.handle('pool_restart', {'reloader': _reload})        self.app.conf.worker_pool_restarts = True        panel.handle('pool_restart', {'reloader': _reload})        consumer.controller.pool.restart.assert_called()        consumer.reset_rate_limits.assert_called_with()        consumer.update_strategies.assert_called_with()        _reload.assert_not_called()        _import.assert_not_called()        consumer.controller.pool.restart.side_effect = NotImplementedError()        panel.handle('pool_restart', {'reloader': _reload})        consumer.controller.consumer = None        panel.handle('pool_restart', {'reloader': _reload})    @patch('celery.worker.worker.logger.debug')    def test_pool_restart_import_modules(self, _debug):        consumer = Consumer(self.app)        consumer.controller = _WC(app=self.app)        consumer.controller.consumer = consumer        consumer.controller.pool.restart = Mock()        consumer.reset_rate_limits = Mock(name='reset_rate_limits()')        consumer.update_strategies = Mock(name='update_strategies()')        panel = self.create_panel(consumer=consumer)        panel.app = self.app        assert panel.state.consumer.controller.consumer is consumer        _import = consumer.controller.app.loader.import_from_cwd = Mock()        _reload = Mock()        self.app.conf.worker_pool_restarts = True        with patch('sys.modules'):            panel.handle('pool_restart', {                'modules': ['foo', 'bar'],                'reloader': _reload,            })        consumer.controller.pool.restart.assert_called()        consumer.reset_rate_limits.assert_called_with()        consumer.update_strategies.assert_called_with()        _reload.assert_not_called()        _import.assert_has_calls([call('bar'), call('foo')], any_order=True)        assert _import.call_count == 2    def test_pool_restart_reload_modules(self):        consumer = Consumer(self.app)        consumer.controller = _WC(app=self.app)        consumer.controller.consumer = consumer        consumer.controller.pool.restart = Mock()        consumer.reset_rate_limits = Mock(name='reset_rate_limits()')        consumer.update_strategies = Mock(name='update_strategies()')        panel = self.create_panel(consumer=consumer)        panel.app = self.app        _import = panel.app.loader.import_from_cwd = Mock()        _reload = Mock()        self.app.conf.worker_pool_restarts = True        with patch.dict(sys.modules, {'foo': None}):            panel.handle('pool_restart', {                'modules': ['foo'],                'reload': False,                'reloader': _reload,            })            consumer.controller.pool.restart.assert_called()            _reload.assert_not_called()            _import.assert_not_called()            _import.reset_mock()            _reload.reset_mock()            consumer.controller.pool.restart.reset_mock()            panel.handle('pool_restart', {                'modules': ['foo'],                'reload': True,                'reloader': _reload,            })            consumer.controller.pool.restart.assert_called()            _reload.assert_called()            _import.assert_not_called()    def test_query_task(self):        consumer = Consumer(self.app)        consumer.controller = _WC(app=self.app)        consumer.controller.consumer = consumer        panel = self.create_panel(consumer=consumer)        panel.app = self.app        req1 = Request(            self.TaskMessage(self.mytask.name, args=(2, 2)),            app=self.app,        )        worker_state.task_reserved(req1)        try:            assert not panel.handle('query_task', {'ids': {'1daa'}})            ret = panel.handle('query_task', {'ids': {req1.id}})            assert req1.id in ret            assert ret[req1.id][0] == 'reserved'            worker_state.active_requests.add(req1)            try:                ret = panel.handle('query_task', {'ids': {req1.id}})                assert ret[req1.id][0] == 'active'            finally:                worker_state.active_requests.clear()            ret = panel.handle('query_task', {'ids': {req1.id}})            assert ret[req1.id][0] == 'reserved'        finally:            worker_state.reserved_requests.clear()
 |