|
@@ -3,59 +3,83 @@
|
|
|
celery.backends.cassandra
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
- Apache Cassandra result store backend.
|
|
|
+ Apache Cassandra result store backend using DataStax driver
|
|
|
|
|
|
"""
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
+import sys
|
|
|
try: # pragma: no cover
|
|
|
- import pycassa
|
|
|
- from thrift import Thrift
|
|
|
- C = pycassa.cassandra.ttypes
|
|
|
+ import cassandra
|
|
|
+ import cassandra.cluster
|
|
|
except ImportError: # pragma: no cover
|
|
|
- pycassa = None # noqa
|
|
|
-
|
|
|
-import socket
|
|
|
-import time
|
|
|
+ cassandra = None # noqa
|
|
|
|
|
|
from celery import states
|
|
|
from celery.exceptions import ImproperlyConfigured
|
|
|
-from celery.five import monotonic
|
|
|
-from celery.utils import deprecated
|
|
|
from celery.utils.log import get_logger
|
|
|
-
|
|
|
from .base import BaseBackend
|
|
|
|
|
|
__all__ = ['CassandraBackend']
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
+E_NO_CASSANDRA = """
|
|
|
+You need to install the cassandra-driver library to
|
|
|
+use the Cassandra backend. See https://github.com/datastax/python-driver
|
|
|
+"""
|
|
|
|
|
|
-class CassandraBackend(BaseBackend):
|
|
|
- """Highly fault tolerant Cassandra backend.
|
|
|
+Q_INSERT_RESULT = """
|
|
|
+INSERT INTO {table} (
|
|
|
+ task_id, status, result, date_done, traceback, children) VALUES (
|
|
|
+ %s, %s, %s, %s, %s, %s) {expires};
|
|
|
+"""
|
|
|
+
|
|
|
+Q_SELECT_RESULT = """
|
|
|
+SELECT status, result, date_done, traceback, children
|
|
|
+FROM {table}
|
|
|
+WHERE task_id=%s
|
|
|
+LIMIT 1
|
|
|
+"""
|
|
|
+
|
|
|
+Q_CREATE_RESULT_TABLE = """
|
|
|
+CREATE TABLE {table} (
|
|
|
+ task_id text,
|
|
|
+ status text,
|
|
|
+ result blob,
|
|
|
+ date_done timestamp,
|
|
|
+ traceback blob,
|
|
|
+ children blob,
|
|
|
+ PRIMARY KEY ((task_id), date_done)
|
|
|
+) WITH CLUSTERING ORDER BY (date_done DESC);
|
|
|
+"""
|
|
|
|
|
|
- .. attribute:: servers
|
|
|
+Q_EXPIRES = """
|
|
|
+ USING TTL {0}
|
|
|
+"""
|
|
|
+
|
|
|
+if sys.version_info[0] == 3:
|
|
|
+ def buf_t(x):
|
|
|
+ return bytes(x, 'utf8')
|
|
|
+else:
|
|
|
+ buf_t = buffer # noqa
|
|
|
|
|
|
- List of Cassandra servers with format: ``hostname:port``.
|
|
|
+
|
|
|
+class CassandraBackend(BaseBackend):
|
|
|
+ """Cassandra backend utilizing DataStax driver
|
|
|
|
|
|
:raises celery.exceptions.ImproperlyConfigured: if
|
|
|
- module :mod:`pycassa` is not available.
|
|
|
+ module :mod:`cassandra` is not available.
|
|
|
|
|
|
"""
|
|
|
- servers = []
|
|
|
- keyspace = None
|
|
|
- column_family = None
|
|
|
- detailed_mode = False
|
|
|
- _retry_timeout = 300
|
|
|
- _retry_wait = 3
|
|
|
- supports_autoexpire = True
|
|
|
-
|
|
|
- @deprecated(description='The old cassandra backend',
|
|
|
- deprecation='4.0',
|
|
|
- removal='5.0',
|
|
|
- alternative='Use the `new_cassandra` result backend instead')
|
|
|
- def __init__(self, servers=None, keyspace=None, column_family=None,
|
|
|
- cassandra_options=None, detailed_mode=False, **kwargs):
|
|
|
+
|
|
|
+ #: List of Cassandra servers with format: ``hostname``.
|
|
|
+ servers = None
|
|
|
+
|
|
|
+ supports_autoexpire = True # autoexpire supported via entry_ttl
|
|
|
+
|
|
|
+ def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
|
|
|
+ port=9042, **kwargs):
|
|
|
"""Initialize Cassandra backend.
|
|
|
|
|
|
Raises :class:`celery.exceptions.ImproperlyConfigured` if
|
|
@@ -64,129 +88,139 @@ class CassandraBackend(BaseBackend):
|
|
|
"""
|
|
|
super(CassandraBackend, self).__init__(**kwargs)
|
|
|
|
|
|
- if not pycassa:
|
|
|
- raise ImproperlyConfigured(
|
|
|
- 'You need to install the pycassa library to use the '
|
|
|
- 'Cassandra backend. See https://github.com/pycassa/pycassa')
|
|
|
+ if not cassandra:
|
|
|
+ raise ImproperlyConfigured(E_NO_CASSANDRA)
|
|
|
|
|
|
conf = self.app.conf
|
|
|
self.servers = (servers or
|
|
|
- conf.get('cassandra_servers') or
|
|
|
- self.servers)
|
|
|
+ conf.get('cassandra_servers', None))
|
|
|
+ self.port = (port or
|
|
|
+ conf.get('cassandra_port', None))
|
|
|
self.keyspace = (keyspace or
|
|
|
- conf.get('cassandra_keyspace') or
|
|
|
- self.keyspace)
|
|
|
- self.column_family = (column_family or
|
|
|
- conf.get('cassandra_column_family') or
|
|
|
- self.column_family)
|
|
|
- self.cassandra_options = dict(conf.get('cassandra_options') or {},
|
|
|
- **cassandra_options or {})
|
|
|
- self.detailed_mode = (detailed_mode or
|
|
|
- conf.get('cassandra_detailed_mode') or
|
|
|
- self.detailed_mode)
|
|
|
+ conf.get('cassandra_keyspace', None))
|
|
|
+ self.table = (table or
|
|
|
+ conf.get('cassandra_table', None))
|
|
|
+
|
|
|
+ if not self.servers or not self.keyspace or not self.table:
|
|
|
+ raise ImproperlyConfigured('Cassandra backend not configured.')
|
|
|
+
|
|
|
+ expires = (entry_ttl or conf.get('cassandra_entry_ttl', None))
|
|
|
+
|
|
|
+ self.cqlexpires = (Q_EXPIRES.format(expires)
|
|
|
+ if expires is not None else '')
|
|
|
+
|
|
|
read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
|
|
|
write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
|
|
|
- try:
|
|
|
- self.read_consistency = getattr(pycassa.ConsistencyLevel,
|
|
|
- read_cons)
|
|
|
- except AttributeError:
|
|
|
- self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
|
|
|
- try:
|
|
|
- self.write_consistency = getattr(pycassa.ConsistencyLevel,
|
|
|
- write_cons)
|
|
|
- except AttributeError:
|
|
|
- self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
|
|
|
-
|
|
|
- if not self.servers or not self.keyspace or not self.column_family:
|
|
|
- raise ImproperlyConfigured(
|
|
|
- 'Cassandra backend not configured.')
|
|
|
-
|
|
|
- self._column_family = None
|
|
|
-
|
|
|
- def _retry_on_error(self, fun, *args, **kwargs):
|
|
|
- ts = monotonic() + self._retry_timeout
|
|
|
- while 1:
|
|
|
- try:
|
|
|
- return fun(*args, **kwargs)
|
|
|
- except (pycassa.InvalidRequestException,
|
|
|
- pycassa.TimedOutException,
|
|
|
- pycassa.UnavailableException,
|
|
|
- pycassa.AllServersUnavailable,
|
|
|
- socket.error,
|
|
|
- socket.timeout,
|
|
|
- Thrift.TException) as exc:
|
|
|
- if monotonic() > ts:
|
|
|
- raise
|
|
|
- logger.warning('Cassandra error: %r. Retrying...', exc)
|
|
|
- time.sleep(self._retry_wait)
|
|
|
-
|
|
|
- def _get_column_family(self):
|
|
|
- if self._column_family is None:
|
|
|
- conn = pycassa.ConnectionPool(self.keyspace,
|
|
|
- server_list=self.servers,
|
|
|
- **self.cassandra_options)
|
|
|
- self._column_family = pycassa.ColumnFamily(
|
|
|
- conn, self.column_family,
|
|
|
- read_consistency_level=self.read_consistency,
|
|
|
- write_consistency_level=self.write_consistency,
|
|
|
- )
|
|
|
- return self._column_family
|
|
|
+
|
|
|
+ self.read_consistency = getattr(
|
|
|
+ cassandra.ConsistencyLevel, read_cons,
|
|
|
+ cassandra.ConsistencyLevel.LOCAL_QUORUM,
|
|
|
+ )
|
|
|
+ self.write_consistency = getattr(
|
|
|
+ cassandra.ConsistencyLevel, write_cons,
|
|
|
+ cassandra.ConsistencyLevel.LOCAL_QUORUM,
|
|
|
+ )
|
|
|
+
|
|
|
+ self._connection = None
|
|
|
+ self._session = None
|
|
|
+ self._write_stmt = None
|
|
|
+ self._read_stmt = None
|
|
|
|
|
|
def process_cleanup(self):
|
|
|
- if self._column_family is not None:
|
|
|
- self._column_family = None
|
|
|
+ if self._connection is not None:
|
|
|
+ self._connection = None
|
|
|
+ if self._session is not None:
|
|
|
+ self._session.shutdown()
|
|
|
+ self._session = None
|
|
|
+
|
|
|
+ def _get_connection(self, write=False):
|
|
|
+ """Prepare the connection for action
|
|
|
+
|
|
|
+ :param write: bool - are we a writer?
|
|
|
+
|
|
|
+ """
|
|
|
+ if self._connection is None:
|
|
|
+ try:
|
|
|
+ self._connection = cassandra.cluster.Cluster(self.servers,
|
|
|
+ port=self.port)
|
|
|
+ self._session = self._connection.connect(self.keyspace)
|
|
|
+
|
|
|
+ # We are forced to do concatenation below, as formatting would
|
|
|
+ # blow up on superficial %s that will be processed by Cassandra
|
|
|
+ self._write_stmt = cassandra.query.SimpleStatement(
|
|
|
+ Q_INSERT_RESULT.format(
|
|
|
+ table=self.table, expires=self.cqlexpires),
|
|
|
+ )
|
|
|
+ self._write_stmt.consistency_level = self.write_consistency
|
|
|
+
|
|
|
+ self._read_stmt = cassandra.query.SimpleStatement(
|
|
|
+ Q_SELECT_RESULT.format(table=self.table),
|
|
|
+ )
|
|
|
+ self._read_stmt.consistency_level = self.read_consistency
|
|
|
+
|
|
|
+ if write:
|
|
|
+ # Only possible writers "workers" are allowed to issue
|
|
|
+ # CREATE TABLE. This is to prevent conflicting situations
|
|
|
+ # where both task-creator and task-executor would issue it
|
|
|
+ # at the same time.
|
|
|
+
|
|
|
+ # Anyway; if you're doing anything critical, you should
|
|
|
+ # have created this table in advance, in which case
|
|
|
+ # this query will be a no-op (AlreadyExists)
|
|
|
+ self._make_stmt = cassandra.query.SimpleStatement(
|
|
|
+ Q_CREATE_RESULT_TABLE.format(table=self.table),
|
|
|
+ )
|
|
|
+ self._make_stmt.consistency_level = self.write_consistency
|
|
|
+ try:
|
|
|
+ self._session.execute(self._make_stmt)
|
|
|
+ except cassandra.AlreadyExists:
|
|
|
+ pass
|
|
|
+
|
|
|
+ except cassandra.OperationTimedOut:
|
|
|
+ # a heavily loaded or gone Cassandra cluster failed to respond.
|
|
|
+ # leave this class in a consistent state
|
|
|
+ self._connection = None
|
|
|
+ if self._session is not None:
|
|
|
+ self._session.shutdown()
|
|
|
+
|
|
|
+ raise # we did fail after all - reraise
|
|
|
|
|
|
def _store_result(self, task_id, result, status,
|
|
|
traceback=None, request=None, **kwargs):
|
|
|
"""Store return value and status of an executed task."""
|
|
|
+ self._get_connection(write=True)
|
|
|
|
|
|
- def _do_store():
|
|
|
- cf = self._get_column_family()
|
|
|
- date_done = self.app.now()
|
|
|
- meta = {'status': status,
|
|
|
- 'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
|
- 'traceback': self.encode(traceback),
|
|
|
- 'result': self.encode(result),
|
|
|
- 'children': self.encode(
|
|
|
- self.current_task_children(request),
|
|
|
- )}
|
|
|
- if self.detailed_mode:
|
|
|
- cf.insert(
|
|
|
- task_id, {date_done: self.encode(meta)}, ttl=self.expires,
|
|
|
- )
|
|
|
- else:
|
|
|
- cf.insert(task_id, meta, ttl=self.expires)
|
|
|
-
|
|
|
- return self._retry_on_error(_do_store)
|
|
|
+ self._session.execute(self._write_stmt, (
|
|
|
+ task_id,
|
|
|
+ status,
|
|
|
+ buf_t(self.encode(result)),
|
|
|
+ self.app.now(),
|
|
|
+ buf_t(self.encode(traceback)),
|
|
|
+ buf_t(self.encode(self.current_task_children(request)))
|
|
|
+ ))
|
|
|
|
|
|
def _get_task_meta_for(self, task_id):
|
|
|
"""Get task metadata for a task by id."""
|
|
|
+ self._get_connection()
|
|
|
|
|
|
- def _do_get():
|
|
|
- cf = self._get_column_family()
|
|
|
- try:
|
|
|
- if self.detailed_mode:
|
|
|
- row = cf.get(task_id, column_reversed=True, column_count=1)
|
|
|
- return self.decode(list(row.values())[0])
|
|
|
- else:
|
|
|
- obj = cf.get(task_id)
|
|
|
- return self.meta_from_decoded({
|
|
|
- 'task_id': task_id,
|
|
|
- 'status': obj['status'],
|
|
|
- 'result': self.decode(obj['result']),
|
|
|
- 'date_done': obj['date_done'],
|
|
|
- 'traceback': self.decode(obj['traceback']),
|
|
|
- 'children': self.decode(obj['children']),
|
|
|
- })
|
|
|
- except (KeyError, pycassa.NotFoundException):
|
|
|
- return {'status': states.PENDING, 'result': None}
|
|
|
-
|
|
|
- return self._retry_on_error(_do_get)
|
|
|
+ res = self._session.execute(self._read_stmt, (task_id, ))
|
|
|
+ if not res:
|
|
|
+ return {'status': states.PENDING, 'result': None}
|
|
|
+
|
|
|
+ status, result, date_done, traceback, children = res[0]
|
|
|
+
|
|
|
+ return self.meta_from_decoded({
|
|
|
+ 'task_id': task_id,
|
|
|
+ 'status': status,
|
|
|
+ 'result': self.decode(result),
|
|
|
+ 'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
|
+ 'traceback': self.decode(traceback),
|
|
|
+ 'children': self.decode(children),
|
|
|
+ })
|
|
|
|
|
|
def __reduce__(self, args=(), kwargs={}):
|
|
|
kwargs.update(
|
|
|
dict(servers=self.servers,
|
|
|
keyspace=self.keyspace,
|
|
|
- column_family=self.column_family,
|
|
|
- cassandra_options=self.cassandra_options))
|
|
|
+ table=self.table))
|
|
|
return super(CassandraBackend, self).__reduce__(args, kwargs)
|