123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- # -* coding: utf-8 -*-
- """Apache Cassandra result store backend using the DataStax driver."""
- from __future__ import absolute_import, unicode_literals
- import sys
- try: # pragma: no cover
- import cassandra
- import cassandra.auth
- import cassandra.cluster
- except ImportError: # pragma: no cover
- cassandra = None # noqa
- from celery import states
- from celery.exceptions import ImproperlyConfigured
- from celery.utils.log import get_logger
- from .base import BaseBackend
- __all__ = ['CassandraBackend']
- logger = get_logger(__name__)
- E_NO_CASSANDRA = """
- You need to install the cassandra-driver library to
- use the Cassandra backend. See https://github.com/datastax/python-driver
- """
- E_NO_SUCH_CASSANDRA_AUTH_PROVIDER = """
- CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
- See https://datastax.github.io/python-driver/api/cassandra/auth.html.
- """
- Q_INSERT_RESULT = """
- INSERT INTO {table} (
- task_id, status, result, date_done, traceback, children) VALUES (
- %s, %s, %s, %s, %s, %s) {expires};
- """
- Q_SELECT_RESULT = """
- SELECT status, result, date_done, traceback, children
- FROM {table}
- WHERE task_id=%s
- LIMIT 1
- """
- Q_CREATE_RESULT_TABLE = """
- CREATE TABLE {table} (
- task_id text,
- status text,
- result blob,
- date_done timestamp,
- traceback blob,
- children blob,
- PRIMARY KEY ((task_id), date_done)
- ) WITH CLUSTERING ORDER BY (date_done DESC);
- """
- Q_EXPIRES = """
- USING TTL {0}
- """
- if sys.version_info[0] == 3:
- def buf_t(x):
- return bytes(x, 'utf8')
- else:
- buf_t = buffer # noqa
- class CassandraBackend(BaseBackend):
- """Cassandra backend utilizing DataStax driver
- Raises:
- celery.exceptions.ImproperlyConfigured:
- if module :pypi:`cassandra-driver` is not available,
- or if the :setting:`cassandra_servers` setting is not set.
- """
- #: List of Cassandra servers with format: ``hostname``.
- servers = None
- supports_autoexpire = True # autoexpire supported via entry_ttl
- def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
- port=9042, **kwargs):
- super(CassandraBackend, self).__init__(**kwargs)
- if not cassandra:
- raise ImproperlyConfigured(E_NO_CASSANDRA)
- conf = self.app.conf
- self.servers = servers or conf.get('cassandra_servers', None)
- self.port = port or conf.get('cassandra_port', None)
- self.keyspace = keyspace or conf.get('cassandra_keyspace', None)
- self.table = table or conf.get('cassandra_table', None)
- if not self.servers or not self.keyspace or not self.table:
- raise ImproperlyConfigured('Cassandra backend not configured.')
- expires = entry_ttl or conf.get('cassandra_entry_ttl', None)
- self.cqlexpires = (
- Q_EXPIRES.format(expires) if expires is not None else '')
- read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
- write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
- self.read_consistency = getattr(
- cassandra.ConsistencyLevel, read_cons,
- cassandra.ConsistencyLevel.LOCAL_QUORUM)
- self.write_consistency = getattr(
- cassandra.ConsistencyLevel, write_cons,
- cassandra.ConsistencyLevel.LOCAL_QUORUM)
- self.auth_provider = None
- auth_provider = conf.get('cassandra_auth_provider', None)
- auth_kwargs = conf.get('cassandra_auth_kwargs', None)
- if auth_provider and auth_kwargs:
- auth_provider_class = getattr(cassandra.auth, auth_provider, None)
- if not auth_provider_class:
- raise ImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER)
- self.auth_provider = auth_provider_class(**auth_kwargs)
- self._connection = None
- self._session = None
- self._write_stmt = None
- self._read_stmt = None
- self._make_stmt = None
- def process_cleanup(self):
- if self._connection is not None:
- self._connection.shutdown() # also shuts down _session
- self._connection = None
- self._session = None
- def _get_connection(self, write=False):
- """Prepare the connection for action
- Arguments:
- write (bool): are we a writer?
- """
- if self._connection is not None:
- return
- try:
- self._connection = cassandra.cluster.Cluster(
- self.servers, port=self.port,
- auth_provider=self.auth_provider)
- self._session = self._connection.connect(self.keyspace)
- # We're forced to do concatenation below, as formatting would
- # blow up on superficial %s that'll be processed by Cassandra
- self._write_stmt = cassandra.query.SimpleStatement(
- Q_INSERT_RESULT.format(
- table=self.table, expires=self.cqlexpires),
- )
- self._write_stmt.consistency_level = self.write_consistency
- self._read_stmt = cassandra.query.SimpleStatement(
- Q_SELECT_RESULT.format(table=self.table),
- )
- self._read_stmt.consistency_level = self.read_consistency
- if write:
- # Only possible writers "workers" are allowed to issue
- # CREATE TABLE. This is to prevent conflicting situations
- # where both task-creator and task-executor would issue it
- # at the same time.
- # Anyway; if you're doing anything critical, you should
- # have created this table in advance, in which case
- # this query will be a no-op (AlreadyExists)
- self._make_stmt = cassandra.query.SimpleStatement(
- Q_CREATE_RESULT_TABLE.format(table=self.table),
- )
- self._make_stmt.consistency_level = self.write_consistency
- try:
- self._session.execute(self._make_stmt)
- except cassandra.AlreadyExists:
- pass
- except cassandra.OperationTimedOut:
- # a heavily loaded or gone Cassandra cluster failed to respond.
- # leave this class in a consistent state
- if self._connection is not None:
- self._connection.shutdown() # also shuts down _session
- self._connection = None
- self._session = None
- raise # we did fail after all - reraise
- def _store_result(self, task_id, result, state,
- traceback=None, request=None, **kwargs):
- """Store return value and state of an executed task."""
- self._get_connection(write=True)
- self._session.execute(self._write_stmt, (
- task_id,
- state,
- buf_t(self.encode(result)),
- self.app.now(),
- buf_t(self.encode(traceback)),
- buf_t(self.encode(self.current_task_children(request)))
- ))
- def as_uri(self, include_password=True):
- return 'cassandra://'
- def _get_task_meta_for(self, task_id):
- """Get task meta-data for a task by id."""
- self._get_connection()
- res = self._session.execute(self._read_stmt, (task_id, ))
- if not res:
- return {'status': states.PENDING, 'result': None}
- status, result, date_done, traceback, children = res[0]
- return self.meta_from_decoded({
- 'task_id': task_id,
- 'status': status,
- 'result': self.decode(result),
- 'date_done': date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
- 'traceback': self.decode(traceback),
- 'children': self.decode(children),
- })
- def __reduce__(self, args=(), kwargs={}):
- kwargs.update(
- dict(servers=self.servers,
- keyspace=self.keyspace,
- table=self.table))
- return super(CassandraBackend, self).__reduce__(args, kwargs)
|