|| from __future__ import absolute_import, unicode_literalsimport pytestfrom kombu.pidbox import Mailboxfrom vine.utils import wrapsfrom celery import uuidfrom celery.app import controlfrom celery.exceptions import DuplicateNodenameWarningclass MockMailbox(Mailbox):    sent = []    def _publish(self, command, *args, **kwargs):        self.__class__.sent.append(command)    def close(self):        pass    def _collect(self, *args, **kwargs):        passclass Control(control.Control):    Mailbox = MockMailboxdef with_mock_broadcast(fun):    @wraps(fun)    def _resets(*args, **kwargs):        MockMailbox.sent = []        try:            return fun(*args, **kwargs)        finally:            MockMailbox.sent = []    return _resetsclass test_flatten_reply:    def test_flatten_reply(self):        reply = [            {'foo@example.com': {'hello': 10}},            {'foo@example.com': {'hello': 20}},            {'bar@example.com': {'hello': 30}}        ]        with pytest.warns(DuplicateNodenameWarning) as w:            nodes = control.flatten_reply(reply)        assert 'Received multiple replies from node name: {0}.'.format(            next(iter(reply[0]))) in str(w[0].message.args[0])        assert 'foo@example.com' in nodes        assert 'bar@example.com' in nodesclass test_inspect:    def setup(self):        self.c = Control(app=self.app)        self.prev, self.app.control = self.app.control, self.c        self.i = self.c.inspect()    def test_prepare_reply(self):        reply = self.i._prepare([            {'w1': {'ok': 1}},            {'w2': {'ok': 1}},        ])        assert reply == {            'w1': {'ok': 1},            'w2': {'ok': 1},        }        i = self.c.inspect(destination='w1')        assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}    @with_mock_broadcast    def test_active(self):        self.i.active()        assert 'active' in MockMailbox.sent    @with_mock_broadcast    def test_clock(self):        self.i.clock()        assert 'clock' in MockMailbox.sent    @with_mock_broadcast    def test_conf(self):        self.i.conf()        assert 'conf' in MockMailbox.sent    @with_mock_broadcast    def test_hello(self):        self.i.hello('george@vandelay.com')        assert 'hello' in MockMailbox.sent    @with_mock_broadcast    def test_memsample(self):        self.i.memsample()        assert 'memsample' in MockMailbox.sent    @with_mock_broadcast    def test_memdump(self):        self.i.memdump()        assert 'memdump' in MockMailbox.sent    @with_mock_broadcast    def test_objgraph(self):        self.i.objgraph()        assert 'objgraph' in MockMailbox.sent    @with_mock_broadcast    def test_scheduled(self):        self.i.scheduled()        assert 'scheduled' in MockMailbox.sent    @with_mock_broadcast    def test_reserved(self):        self.i.reserved()        assert 'reserved' in MockMailbox.sent    @with_mock_broadcast    def test_stats(self):        self.i.stats()        assert 'stats' in MockMailbox.sent    @with_mock_broadcast    def test_revoked(self):        self.i.revoked()        assert 'revoked' in MockMailbox.sent    @with_mock_broadcast    def test_tasks(self):        self.i.registered()        assert 'registered' in MockMailbox.sent    @with_mock_broadcast    def test_ping(self):        self.i.ping()        assert 'ping' in MockMailbox.sent    @with_mock_broadcast    def test_active_queues(self):        self.i.active_queues()        assert 'active_queues' in MockMailbox.sent    @with_mock_broadcast    def test_report(self):        self.i.report()        assert 'report' in MockMailbox.sentclass test_Broadcast:    def setup(self):        self.control = Control(app=self.app)        self.app.control = self.control        @self.app.task(shared=False)        def mytask():            pass        self.mytask = mytask    def test_purge(self):        self.control.purge()    @with_mock_broadcast    def test_broadcast(self):        self.control.broadcast('foobarbaz', arguments=[])        assert 'foobarbaz' in MockMailbox.sent    @with_mock_broadcast    def test_broadcast_limit(self):        self.control.broadcast(            'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],        )        assert 'foobarbaz1' in MockMailbox.sent    @with_mock_broadcast    def test_broadcast_validate(self):        with pytest.raises(ValueError):            self.control.broadcast('foobarbaz2',                                   destination='foo')    @with_mock_broadcast    def test_rate_limit(self):        self.control.rate_limit(self.mytask.name, '100/m')        assert 'rate_limit' in MockMailbox.sent    @with_mock_broadcast    def test_time_limit(self):        self.control.time_limit(self.mytask.name, soft=10, hard=20)        assert 'time_limit' in MockMailbox.sent    @with_mock_broadcast    def test_add_consumer(self):        self.control.add_consumer('foo')        assert 'add_consumer' in MockMailbox.sent    @with_mock_broadcast    def test_cancel_consumer(self):        self.control.cancel_consumer('foo')        assert 'cancel_consumer' in MockMailbox.sent    @with_mock_broadcast    def test_enable_events(self):        self.control.enable_events()        assert 'enable_events' in MockMailbox.sent    @with_mock_broadcast    def test_disable_events(self):        self.control.disable_events()        assert 'disable_events' in MockMailbox.sent    @with_mock_broadcast    def test_revoke(self):        self.control.revoke('foozbaaz')        assert 'revoke' in MockMailbox.sent    @with_mock_broadcast    def test_ping(self):        self.control.ping()        assert 'ping' in MockMailbox.sent    @with_mock_broadcast    def test_election(self):        self.control.election('some_id', 'topic', 'action')        assert 'election' in MockMailbox.sent    @with_mock_broadcast    def test_pool_grow(self):        self.control.pool_grow(2)        assert 'pool_grow' in MockMailbox.sent    @with_mock_broadcast    def test_pool_shrink(self):        self.control.pool_shrink(2)        assert 'pool_shrink' in MockMailbox.sent    @with_mock_broadcast    def test_revoke_from_result(self):        self.app.AsyncResult('foozbazzbar').revoke()        assert 'revoke' in MockMailbox.sent    @with_mock_broadcast    def test_revoke_from_resultset(self):        r = self.app.GroupResult(uuid(),                                 [self.app.AsyncResult(x)                                  for x in [uuid() for i in range(10)]])        r.revoke()        assert 'revoke' in MockMailbox.sent
 |