|  | @@ -9,7 +9,6 @@
 | 
	
		
			
				|  |  |  from __future__ import absolute_import
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import sys
 | 
	
		
			
				|  |  | -import six
 | 
	
		
			
				|  |  |  try:  # pragma: no cover
 | 
	
		
			
				|  |  |      import cassandra
 | 
	
		
			
				|  |  |  except ImportError:  # pragma: no cover
 | 
	
	
		
			
				|  | @@ -55,13 +54,17 @@ class NewCassandraBackend(BaseBackend):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          conf = self.app.conf
 | 
	
		
			
				|  |  |          self.servers = (servers or
 | 
	
		
			
				|  |  | -                        conf.get('CASSANDRA_SERVERS'))
 | 
	
		
			
				|  |  | +                        conf.get('CASSANDRA_SERVERS', None))
 | 
	
		
			
				|  |  |          self.port = (port or
 | 
	
		
			
				|  |  | -                     conf.get('CASSANDRA_PORT'))
 | 
	
		
			
				|  |  | +                     conf.get('CASSANDRA_PORT', None))
 | 
	
		
			
				|  |  |          self.keyspace = (keyspace or
 | 
	
		
			
				|  |  | -                         conf.get('CASSANDRA_KEYSPACE'))
 | 
	
		
			
				|  |  | +                         conf.get('CASSANDRA_KEYSPACE', None))
 | 
	
		
			
				|  |  |          self.table = (table or
 | 
	
		
			
				|  |  | -                      conf.get('CASSANDRA_TABLE'))
 | 
	
		
			
				|  |  | +                      conf.get('CASSANDRA_TABLE', None))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if not self.servers or not self.keyspace or not self.table:
 | 
	
		
			
				|  |  | +            raise ImproperlyConfigured('Cassandra backend not configured.')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          expires = (entry_ttl or conf.get('CASSANDRA_ENTRY_TTL', None))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if expires is not None:
 | 
	
	
		
			
				|  | @@ -71,20 +74,11 @@ class NewCassandraBackend(BaseBackend):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          read_cons = conf.get('CASSANDRA_READ_CONSISTENCY') or 'LOCAL_QUORUM'
 | 
	
		
			
				|  |  |          write_cons = conf.get('CASSANDRA_WRITE_CONSISTENCY') or 'LOCAL_QUORUM'
 | 
	
		
			
				|  |  | -        try:
 | 
	
		
			
				|  |  | -            self.read_consistency = getattr(cassandra.ConsistencyLevel,
 | 
	
		
			
				|  |  | -                                            read_cons)
 | 
	
		
			
				|  |  | -        except AttributeError:
 | 
	
		
			
				|  |  | -            self.read_consistency = cassandra.ConsistencyLevel.LOCAL_QUORUM
 | 
	
		
			
				|  |  | -        try:
 | 
	
		
			
				|  |  | -            self.write_consistency = getattr(cassandra.ConsistencyLevel,
 | 
	
		
			
				|  |  | -                                             write_cons)
 | 
	
		
			
				|  |  | -        except AttributeError:
 | 
	
		
			
				|  |  | -            self.write_consistency = cassandra.ConsistencyLevel.LOCAL_QUORUM
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        if not self.servers or not self.keyspace or not self.table:
 | 
	
		
			
				|  |  | -            raise ImproperlyConfigured(
 | 
	
		
			
				|  |  | -                'Cassandra backend not configured.')
 | 
	
		
			
				|  |  | +        self.read_consistency = getattr(cassandra.ConsistencyLevel,
 | 
	
		
			
				|  |  | +            read_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM)
 | 
	
		
			
				|  |  | +        self.write_consistency = getattr(cassandra.ConsistencyLevel,
 | 
	
		
			
				|  |  | +            write_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self._connection = None
 | 
	
		
			
				|  |  |          self._session = None
 | 
	
	
		
			
				|  | @@ -108,8 +102,10 @@ class NewCassandraBackend(BaseBackend):
 | 
	
		
			
				|  |  |                                                           port=self.port)
 | 
	
		
			
				|  |  |              self._session = self._connection.connect(self.keyspace)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +            # We are forced to do concatenation below, as formatting would
 | 
	
		
			
				|  |  | +            # blow up on superficial %s that will be processed by Cassandra
 | 
	
		
			
				|  |  |              self._write_stmt = cassandra.query.SimpleStatement(
 | 
	
		
			
				|  |  | -                'INSERT INTO '+self.table+' (task_id, status, result,'''
 | 
	
		
			
				|  |  | +                'INSERT INTO %s (task_id, status, result,'''
 | 
	
		
			
				|  |  |                  ''' date_done, traceback, children) VALUES'''
 | 
	
		
			
				|  |  |                  ' (%s, %s, %s, %s, %s, %s) '+self.cqlexpires+';')
 | 
	
		
			
				|  |  |              self._write_stmt.consistency_level = self.write_consistency
 | 
	
	
		
			
				|  | @@ -151,8 +147,7 @@ class NewCassandraBackend(BaseBackend):
 | 
	
		
			
				|  |  |          """Store return value and status of an executed task."""
 | 
	
		
			
				|  |  |          self._get_connection(write=True)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        import sys
 | 
	
		
			
				|  |  | -        if six.PY3:
 | 
	
		
			
				|  |  | +        if sys.version_info >= (3,):
 | 
	
		
			
				|  |  |              buf = lambda x: bytes(x, 'utf8')
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  |              buf = buffer
 |