123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- from __future__ import absolute_import, unicode_literals
- from datetime import datetime
- from pickle import dumps, loads
- import pytest
- from case import Mock, mock
- from celery import states
- from celery.exceptions import ImproperlyConfigured
- from celery.utils.objects import Bunch
- CASSANDRA_MODULES = ['cassandra', 'cassandra.auth', 'cassandra.cluster']
- @mock.module(*CASSANDRA_MODULES)
- class test_CassandraBackend:
- def setup(self):
- self.app.conf.update(
- cassandra_servers=['example.com'],
- cassandra_keyspace='celery',
- cassandra_table='task_results',
- )
- def test_init_no_cassandra(self, *modules):
- # should raise ImproperlyConfigured when no python-driver
- # installed.
- from celery.backends import cassandra as mod
- prev, mod.cassandra = mod.cassandra, None
- try:
- with pytest.raises(ImproperlyConfigured):
- mod.CassandraBackend(app=self.app)
- finally:
- mod.cassandra = prev
- def test_init_with_and_without_LOCAL_QUROM(self, *modules):
- from celery.backends import cassandra as mod
- mod.cassandra = Mock()
- cons = mod.cassandra.ConsistencyLevel = Bunch(
- LOCAL_QUORUM='foo',
- )
- self.app.conf.cassandra_read_consistency = 'LOCAL_FOO'
- self.app.conf.cassandra_write_consistency = 'LOCAL_FOO'
- mod.CassandraBackend(app=self.app)
- cons.LOCAL_FOO = 'bar'
- mod.CassandraBackend(app=self.app)
- # no servers raises ImproperlyConfigured
- with pytest.raises(ImproperlyConfigured):
- self.app.conf.cassandra_servers = None
- mod.CassandraBackend(
- app=self.app, keyspace='b', column_family='c',
- )
- @pytest.mark.usefixtures('depends_on_current_app')
- def test_reduce(self, *modules):
- from celery.backends.cassandra import CassandraBackend
- assert loads(dumps(CassandraBackend(app=self.app)))
- def test_get_task_meta_for(self, *modules):
- from celery.backends import cassandra as mod
- mod.cassandra = Mock()
- x = mod.CassandraBackend(app=self.app)
- x._connection = True
- session = x._session = Mock()
- execute = session.execute = Mock()
- execute.return_value = [
- [states.SUCCESS, '1', datetime.now(), b'', b'']
- ]
- x.decode = Mock()
- meta = x._get_task_meta_for('task_id')
- assert meta['status'] == states.SUCCESS
- x._session.execute.return_value = []
- meta = x._get_task_meta_for('task_id')
- assert meta['status'] == states.PENDING
- def test_store_result(self, *modules):
- from celery.backends import cassandra as mod
- mod.cassandra = Mock()
- x = mod.CassandraBackend(app=self.app)
- x._connection = True
- session = x._session = Mock()
- session.execute = Mock()
- x._store_result('task_id', 'result', states.SUCCESS)
- def test_process_cleanup(self, *modules):
- from celery.backends import cassandra as mod
- x = mod.CassandraBackend(app=self.app)
- x.process_cleanup()
- assert x._connection is None
- assert x._session is None
- def test_timeouting_cluster(self):
- # Tests behavior when Cluster.connect raises
- # cassandra.OperationTimedOut.
- from celery.backends import cassandra as mod
- class OTOExc(Exception):
- pass
- class VeryFaultyCluster(object):
- def __init__(self, *args, **kwargs):
- pass
- def connect(self, *args, **kwargs):
- raise OTOExc()
- def shutdown(self):
- pass
- mod.cassandra = Mock()
- mod.cassandra.OperationTimedOut = OTOExc
- mod.cassandra.cluster = Mock()
- mod.cassandra.cluster.Cluster = VeryFaultyCluster
- x = mod.CassandraBackend(app=self.app)
- with pytest.raises(OTOExc):
- x._store_result('task_id', 'result', states.SUCCESS)
- assert x._connection is None
- assert x._session is None
- x.process_cleanup() # shouldn't raise
- def test_please_free_memory(self):
- # Ensure that Cluster object IS shut down.
- from celery.backends import cassandra as mod
- class RAMHoggingCluster(object):
- objects_alive = 0
- def __init__(self, *args, **kwargs):
- pass
- def connect(self, *args, **kwargs):
- RAMHoggingCluster.objects_alive += 1
- return Mock()
- def shutdown(self):
- RAMHoggingCluster.objects_alive -= 1
- mod.cassandra = Mock()
- mod.cassandra.cluster = Mock()
- mod.cassandra.cluster.Cluster = RAMHoggingCluster
- for x in range(0, 10):
- x = mod.CassandraBackend(app=self.app)
- x._store_result('task_id', 'result', states.SUCCESS)
- x.process_cleanup()
- assert RAMHoggingCluster.objects_alive == 0
- def test_auth_provider(self):
- # Ensure valid auth_provider works properly, and invalid one raises
- # ImproperlyConfigured exception.
- from celery.backends import cassandra as mod
- class DummyAuth(object):
- ValidAuthProvider = Mock()
- mod.cassandra = Mock()
- mod.cassandra.auth = DummyAuth
- # Valid auth_provider
- self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'
- self.app.conf.cassandra_auth_kwargs = {
- 'username': 'stuff'
- }
- mod.CassandraBackend(app=self.app)
- # Invalid auth_provider
- self.app.conf.cassandra_auth_provider = 'SpiderManAuth'
- self.app.conf.cassandra_auth_kwargs = {
- 'username': 'Jack'
- }
- with pytest.raises(ImproperlyConfigured):
- mod.CassandraBackend(app=self.app)
- def test_options(self):
- # Ensure valid options works properly
- from celery.backends import cassandra as mod
- mod.cassandra = Mock()
- # Valid options
- self.app.conf.cassandra_options = {
- 'cql_version': '3.2.1',
- 'protocol_version': 3
- }
- mod.CassandraBackend(app=self.app)
|