|
@@ -11,6 +11,7 @@ from __future__ import absolute_import
|
|
|
import sys
|
|
|
try: # pragma: no cover
|
|
|
import cassandra
|
|
|
+ import cassandra.cluster
|
|
|
except ImportError: # pragma: no cover
|
|
|
cassandra = None # noqa
|
|
|
|
|
@@ -19,22 +20,62 @@ from celery.exceptions import ImproperlyConfigured
|
|
|
from celery.utils.log import get_logger
|
|
|
from .base import BaseBackend
|
|
|
|
|
|
-__all__ = ['NewCassandraBackend']
|
|
|
+__all__ = ['CassandraBackend']
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
+E_NO_CASSANDRA = """
|
|
|
+You need to install the cassandra-driver library to
|
|
|
+use the Cassandra backend. See https://github.com/datastax/python-driver
|
|
|
+"""
|
|
|
+
|
|
|
+Q_INSERT_RESULT = """
|
|
|
+INSERT INTO {table} (
|
|
|
+ task_id, status, result, date_done, traceback, children) VALUES (
|
|
|
+ %s, %s, %s, %s, %s, %s) {expires};
|
|
|
+"""
|
|
|
+
|
|
|
+Q_SELECT_RESULT = """
|
|
|
+SELECT status, result, date_done, traceback, children
|
|
|
+FROM {table}
|
|
|
+WHERE task_id=%s
|
|
|
+LIMIT 1
|
|
|
+"""
|
|
|
+
|
|
|
+Q_CREATE_RESULT_TABLE = """
|
|
|
+CREATE TABLE {table} (
|
|
|
+ task_id text,
|
|
|
+ status text,
|
|
|
+ result blob,
|
|
|
+ date_done timestamp,
|
|
|
+ traceback blob,
|
|
|
+ children blob,
|
|
|
+ PRIMARY KEY ((task_id), date_done)
|
|
|
+) WITH CLUSTERING ORDER BY (date_done DESC);
|
|
|
+"""
|
|
|
+
|
|
|
+Q_EXPIRES = """
|
|
|
+ USING TTL {0}
|
|
|
+"""
|
|
|
|
|
|
-class NewCassandraBackend(BaseBackend):
|
|
|
- """New Cassandra backend utilizing DataStax driver
|
|
|
+if sys.version_info[0] == 3:
|
|
|
+ def buf_t(x):
|
|
|
+ return bytes(x, 'utf8')
|
|
|
+else:
|
|
|
+ buf_t = buffer # noqa
|
|
|
|
|
|
- .. attribute:: servers
|
|
|
|
|
|
- List of Cassandra servers with format: ``hostname``
|
|
|
+class CassandraBackend(BaseBackend):
|
|
|
+ """Cassandra backend utilizing DataStax driver
|
|
|
|
|
|
:raises celery.exceptions.ImproperlyConfigured: if
|
|
|
module :mod:`cassandra` is not available.
|
|
|
|
|
|
"""
|
|
|
+
|
|
|
+ #: List of Cassandra servers with format: ``hostname``.
|
|
|
+ servers = None
|
|
|
+
|
|
|
supports_autoexpire = True # autoexpire supported via entry_ttl
|
|
|
|
|
|
def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
|
|
@@ -45,12 +86,10 @@ class NewCassandraBackend(BaseBackend):
|
|
|
the :setting:`CASSANDRA_SERVERS` setting is not set.
|
|
|
|
|
|
"""
|
|
|
- super(NewCassandraBackend, self).__init__(**kwargs)
|
|
|
+ super(CassandraBackend, self).__init__(**kwargs)
|
|
|
|
|
|
if not cassandra:
|
|
|
- raise ImproperlyConfigured(
|
|
|
- 'You need to install the cassandra library to use the '
|
|
|
- 'Cassandra backend. See https://github.com/datastax/python-driver')
|
|
|
+ raise ImproperlyConfigured(E_NO_CASSANDRA)
|
|
|
|
|
|
conf = self.app.conf
|
|
|
self.servers = (servers or
|
|
@@ -67,18 +106,20 @@ class NewCassandraBackend(BaseBackend):
|
|
|
|
|
|
expires = (entry_ttl or conf.get('CASSANDRA_ENTRY_TTL', None))
|
|
|
|
|
|
- if expires is not None:
|
|
|
- self.cqlexpires = ' USING TTL %s' % (expires, )
|
|
|
- else:
|
|
|
- self.cqlexpires = ''
|
|
|
+ self.cqlexpires = (Q_EXPIRES.format(expires)
|
|
|
+ if expires is not None else '')
|
|
|
|
|
|
read_cons = conf.get('CASSANDRA_READ_CONSISTENCY') or 'LOCAL_QUORUM'
|
|
|
write_cons = conf.get('CASSANDRA_WRITE_CONSISTENCY') or 'LOCAL_QUORUM'
|
|
|
|
|
|
- self.read_consistency = getattr(cassandra.ConsistencyLevel,
|
|
|
- read_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM)
|
|
|
- self.write_consistency = getattr(cassandra.ConsistencyLevel,
|
|
|
- write_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM)
|
|
|
+ self.read_consistency = getattr(
|
|
|
+ cassandra.ConsistencyLevel, read_cons,
|
|
|
+ cassandra.ConsistencyLevel.LOCAL_QUORUM,
|
|
|
+ )
|
|
|
+ self.write_consistency = getattr(
|
|
|
+ cassandra.ConsistencyLevel, write_cons,
|
|
|
+ cassandra.ConsistencyLevel.LOCAL_QUORUM,
|
|
|
+ )
|
|
|
|
|
|
self._connection = None
|
|
|
self._session = None
|
|
@@ -87,15 +128,16 @@ class NewCassandraBackend(BaseBackend):
|
|
|
|
|
|
def process_cleanup(self):
|
|
|
if self._connection is not None:
|
|
|
- self._session.shutdown()
|
|
|
self._connection = None
|
|
|
+ if self._session is not None:
|
|
|
+ self._session.shutdown()
|
|
|
self._session = None
|
|
|
|
|
|
def _get_connection(self, write=False):
|
|
|
- """
|
|
|
- Prepare the connection for action
|
|
|
+ """Prepare the connection for action
|
|
|
|
|
|
:param write: bool - are we a writer?
|
|
|
+
|
|
|
"""
|
|
|
if self._connection is None:
|
|
|
self._connection = cassandra.cluster.Cluster(self.servers,
|
|
@@ -105,15 +147,14 @@ class NewCassandraBackend(BaseBackend):
|
|
|
# We are forced to do concatenation below, as formatting would
|
|
|
# blow up on superficial %s that will be processed by Cassandra
|
|
|
self._write_stmt = cassandra.query.SimpleStatement(
|
|
|
- 'INSERT INTO '+self.table+''' (task_id, status, result,'''
|
|
|
- ''' date_done, traceback, children) VALUES'''
|
|
|
- ' (%s, %s, %s, %s, %s, %s) '+self.cqlexpires+';')
|
|
|
+ Q_INSERT_RESULT.format(
|
|
|
+ table=self.table, expires=self.cqlexpires),
|
|
|
+ )
|
|
|
self._write_stmt.consistency_level = self.write_consistency
|
|
|
|
|
|
self._read_stmt = cassandra.query.SimpleStatement(
|
|
|
- '''SELECT status, result, date_done, traceback, children
|
|
|
- FROM '''+self.table+'''
|
|
|
- WHERE task_id=%s LIMIT 1''')
|
|
|
+ Q_SELECT_RESULT.format(table=self.table),
|
|
|
+ )
|
|
|
self._read_stmt.consistency_level = self.read_consistency
|
|
|
|
|
|
if write:
|
|
@@ -126,16 +167,8 @@ class NewCassandraBackend(BaseBackend):
|
|
|
# have probably created this table in advance, in which case
|
|
|
# this query will be a no-op (instant fail with AlreadyExists)
|
|
|
self._make_stmt = cassandra.query.SimpleStatement(
|
|
|
- '''CREATE TABLE '''+self.table+''' (
|
|
|
- task_id text,
|
|
|
- status text,
|
|
|
- result blob,
|
|
|
- date_done timestamp,
|
|
|
- traceback blob,
|
|
|
- children blob,
|
|
|
- PRIMARY KEY ((task_id), date_done)
|
|
|
- )
|
|
|
- WITH CLUSTERING ORDER BY (date_done DESC);''')
|
|
|
+ Q_CREATE_RESULT_TABLE.format(table=self.table),
|
|
|
+ )
|
|
|
self._make_stmt.consistency_level = self.write_consistency
|
|
|
try:
|
|
|
self._session.execute(self._make_stmt)
|
|
@@ -147,18 +180,13 @@ class NewCassandraBackend(BaseBackend):
|
|
|
"""Store return value and status of an executed task."""
|
|
|
self._get_connection(write=True)
|
|
|
|
|
|
- if sys.version_info >= (3,):
|
|
|
- buf = lambda x: bytes(x, 'utf8')
|
|
|
- else:
|
|
|
- buf = buffer
|
|
|
-
|
|
|
self._session.execute(self._write_stmt, (
|
|
|
task_id,
|
|
|
status,
|
|
|
- buf(self.encode(result)),
|
|
|
+ buf_t(self.encode(result)),
|
|
|
self.app.now(),
|
|
|
- buf(self.encode(traceback)),
|
|
|
- buf(self.encode(self.current_task_children(request)))
|
|
|
+ buf_t(self.encode(traceback)),
|
|
|
+ buf_t(self.encode(self.current_task_children(request)))
|
|
|
))
|
|
|
|
|
|
def _get_task_meta_for(self, task_id):
|
|
@@ -185,4 +213,4 @@ class NewCassandraBackend(BaseBackend):
|
|
|
dict(servers=self.servers,
|
|
|
keyspace=self.keyspace,
|
|
|
table=self.table))
|
|
|
- return super(NewCassandraBackend, self).__reduce__(args, kwargs)
|
|
|
+ return super(CassandraBackend, self).__reduce__(args, kwargs)
|