|
@@ -9,7 +9,6 @@
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
import sys
|
|
|
-import six
|
|
|
try: # pragma: no cover
|
|
|
import cassandra
|
|
|
except ImportError: # pragma: no cover
|
|
@@ -55,13 +54,17 @@ class NewCassandraBackend(BaseBackend):
|
|
|
|
|
|
conf = self.app.conf
|
|
|
self.servers = (servers or
|
|
|
- conf.get('CASSANDRA_SERVERS'))
|
|
|
+ conf.get('CASSANDRA_SERVERS', None))
|
|
|
self.port = (port or
|
|
|
- conf.get('CASSANDRA_PORT'))
|
|
|
+ conf.get('CASSANDRA_PORT', None))
|
|
|
self.keyspace = (keyspace or
|
|
|
- conf.get('CASSANDRA_KEYSPACE'))
|
|
|
+ conf.get('CASSANDRA_KEYSPACE', None))
|
|
|
self.table = (table or
|
|
|
- conf.get('CASSANDRA_TABLE'))
|
|
|
+ conf.get('CASSANDRA_TABLE', None))
|
|
|
+
|
|
|
+ if not self.servers or not self.keyspace or not self.table:
|
|
|
+ raise ImproperlyConfigured('Cassandra backend not configured.')
|
|
|
+
|
|
|
expires = (entry_ttl or conf.get('CASSANDRA_ENTRY_TTL', None))
|
|
|
|
|
|
if expires is not None:
|
|
@@ -71,20 +74,11 @@ class NewCassandraBackend(BaseBackend):
|
|
|
|
|
|
read_cons = conf.get('CASSANDRA_READ_CONSISTENCY') or 'LOCAL_QUORUM'
|
|
|
write_cons = conf.get('CASSANDRA_WRITE_CONSISTENCY') or 'LOCAL_QUORUM'
|
|
|
- try:
|
|
|
- self.read_consistency = getattr(cassandra.ConsistencyLevel,
|
|
|
- read_cons)
|
|
|
- except AttributeError:
|
|
|
- self.read_consistency = cassandra.ConsistencyLevel.LOCAL_QUORUM
|
|
|
- try:
|
|
|
- self.write_consistency = getattr(cassandra.ConsistencyLevel,
|
|
|
- write_cons)
|
|
|
- except AttributeError:
|
|
|
- self.write_consistency = cassandra.ConsistencyLevel.LOCAL_QUORUM
|
|
|
|
|
|
- if not self.servers or not self.keyspace or not self.table:
|
|
|
- raise ImproperlyConfigured(
|
|
|
- 'Cassandra backend not configured.')
|
|
|
+ self.read_consistency = getattr(cassandra.ConsistencyLevel,
|
|
|
+ read_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM)
|
|
|
+ self.write_consistency = getattr(cassandra.ConsistencyLevel,
|
|
|
+ write_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM)
|
|
|
|
|
|
self._connection = None
|
|
|
self._session = None
|
|
@@ -108,8 +102,10 @@ class NewCassandraBackend(BaseBackend):
|
|
|
port=self.port)
|
|
|
self._session = self._connection.connect(self.keyspace)
|
|
|
|
|
|
+ # We are forced to do concatenation below, as formatting would
|
|
|
+ # blow up on superficial %s that will be processed by Cassandra
|
|
|
self._write_stmt = cassandra.query.SimpleStatement(
|
|
|
- 'INSERT INTO '+self.table+' (task_id, status, result,'''
|
|
|
+ 'INSERT INTO %s (task_id, status, result,'''
|
|
|
''' date_done, traceback, children) VALUES'''
|
|
|
' (%s, %s, %s, %s, %s, %s) '+self.cqlexpires+';')
|
|
|
self._write_stmt.consistency_level = self.write_consistency
|
|
@@ -151,8 +147,7 @@ class NewCassandraBackend(BaseBackend):
|
|
|
"""Store return value and status of an executed task."""
|
|
|
self._get_connection(write=True)
|
|
|
|
|
|
- import sys
|
|
|
- if six.PY3:
|
|
|
+ if sys.version_info >= (3,):
|
|
|
buf = lambda x: bytes(x, 'utf8')
|
|
|
else:
|
|
|
buf = buffer
|