| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 | from __future__ import absolute_import, unicode_literalsimport pytestfrom pickle import loads, dumpsfrom datetime import datetimefrom case import Mock, mockfrom celery import statesfrom celery.exceptions import ImproperlyConfiguredfrom celery.utils.objects import BunchCASSANDRA_MODULES = ['cassandra', 'cassandra.auth', 'cassandra.cluster']@mock.module(*CASSANDRA_MODULES)class test_CassandraBackend:    def setup(self):        self.app.conf.update(            cassandra_servers=['example.com'],            cassandra_keyspace='celery',            cassandra_table='task_results',        )    def test_init_no_cassandra(self, *modules):        # should raise ImproperlyConfigured when no python-driver        # installed.        from celery.backends import cassandra as mod        prev, mod.cassandra = mod.cassandra, None        try:            with pytest.raises(ImproperlyConfigured):                mod.CassandraBackend(app=self.app)        finally:            mod.cassandra = prev    def test_init_with_and_without_LOCAL_QUROM(self, *modules):        from celery.backends import cassandra as mod        mod.cassandra = Mock()        cons = mod.cassandra.ConsistencyLevel = Bunch(            LOCAL_QUORUM='foo',        )        self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'        self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'        mod.CassandraBackend(app=self.app)        cons.LOCAL_FOO = 'bar'        mod.CassandraBackend(app=self.app)        # no servers raises ImproperlyConfigured        with pytest.raises(ImproperlyConfigured):            self.app.conf.cassandra_servers = None            mod.CassandraBackend(                app=self.app, keyspace='b', column_family='c',            )    @pytest.mark.usefixtures('depends_on_current_app')    def test_reduce(self, *modules):        from celery.backends.cassandra import CassandraBackend        assert loads(dumps(CassandraBackend(app=self.app)))    def test_get_task_meta_for(self, *modules):        from celery.backends import cassandra as mod        mod.cassandra = Mock()        x = mod.CassandraBackend(app=self.app)        x._connection = True        session = x._session = Mock()        execute = session.execute = Mock()        execute.return_value = [            [states.SUCCESS, '1', datetime.now(), b'', b'']        ]        x.decode = Mock()        meta = x._get_task_meta_for('task_id')        assert meta['status'] == states.SUCCESS        x._session.execute.return_value = []        meta = x._get_task_meta_for('task_id')        assert meta['status'] == states.PENDING    def test_store_result(self, *modules):        from celery.backends import cassandra as mod        mod.cassandra = Mock()        x = mod.CassandraBackend(app=self.app)        x._connection = True        session = x._session = Mock()        session.execute = Mock()        x._store_result('task_id', 'result', states.SUCCESS)    def test_process_cleanup(self, *modules):        from celery.backends import cassandra as mod        x = mod.CassandraBackend(app=self.app)        x.process_cleanup()        assert x._connection is None        assert x._session is None    def test_timeouting_cluster(self):        # Tests behavior when Cluster.connect raises        # cassandra.OperationTimedOut.        from celery.backends import cassandra as mod        class OTOExc(Exception):            pass        class VeryFaultyCluster(object):            def __init__(self, *args, **kwargs):                pass            def connect(self, *args, **kwargs):                raise OTOExc()            def shutdown(self):                pass        mod.cassandra = Mock()        mod.cassandra.OperationTimedOut = OTOExc        mod.cassandra.cluster = Mock()        mod.cassandra.cluster.Cluster = VeryFaultyCluster        x = mod.CassandraBackend(app=self.app)        with pytest.raises(OTOExc):            x._store_result('task_id', 'result', states.SUCCESS)        assert x._connection is None        assert x._session is None        x.process_cleanup()  # shouldn't raise    def test_please_free_memory(self):        # Ensure that Cluster object IS shut down.        from celery.backends import cassandra as mod        class RAMHoggingCluster(object):            objects_alive = 0            def __init__(self, *args, **kwargs):                pass            def connect(self, *args, **kwargs):                RAMHoggingCluster.objects_alive += 1                return Mock()            def shutdown(self):                RAMHoggingCluster.objects_alive -= 1        mod.cassandra = Mock()        mod.cassandra.cluster = Mock()        mod.cassandra.cluster.Cluster = RAMHoggingCluster        for x in range(0, 10):            x = mod.CassandraBackend(app=self.app)            x._store_result('task_id', 'result', states.SUCCESS)            x.process_cleanup()        assert RAMHoggingCluster.objects_alive == 0    def test_auth_provider(self):        # Ensure valid auth_provider works properly, and invalid one raises        # ImproperlyConfigured exception.        from celery.backends import cassandra as mod        class DummyAuth(object):            ValidAuthProvider = Mock()        mod.cassandra = Mock()        mod.cassandra.auth = DummyAuth        # Valid auth_provider        self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'        self.app.conf.cassandra_auth_kwargs = {            'username': 'stuff'        }        mod.CassandraBackend(app=self.app)        # Invalid auth_provider        self.app.conf.cassandra_auth_provider = 'SpiderManAuth'        self.app.conf.cassandra_auth_kwargs = {            'username': 'Jack'        }        with pytest.raises(ImproperlyConfigured):            mod.CassandraBackend(app=self.app)    def test_options(self):        # Ensure valid options works properly        from celery.backends import cassandra as mod        mod.cassandra = Mock()        # Valid options        self.app.conf.cassandra_options = {            'cql_version': '3.2.1',            'protocol_version': 3        }        mod.CassandraBackend(app=self.app)
 |