Explorar o código

Merge pull request #3034 from davidharrigan/cassandra_auth

Add auth options to Cassandra backend
Omer Katz %!s(int64=9) %!d(string=hai) anos
pai
achega
1c5ecc55db

+ 1 - 0
CONTRIBUTORS.txt

@@ -203,3 +203,4 @@ Chris Harris, 2015/11/27
 Valentyn Klindukh, 2016/01/15
 Valentyn Klindukh, 2016/01/15
 Wayne Chang, 2016/01/15
 Wayne Chang, 2016/01/15
 Mike Attwood, 2016/01/22
 Mike Attwood, 2016/01/22
+David Harrigan, 2016/02/01

+ 18 - 2
celery/backends/cassandra.py

@@ -11,6 +11,7 @@ from __future__ import absolute_import
 import sys
 import sys
 try:  # pragma: no cover
 try:  # pragma: no cover
     import cassandra
     import cassandra
+    import cassandra.auth
     import cassandra.cluster
     import cassandra.cluster
 except ImportError:  # pragma: no cover
 except ImportError:  # pragma: no cover
     cassandra = None   # noqa
     cassandra = None   # noqa
@@ -29,6 +30,11 @@ You need to install the cassandra-driver library to
 use the Cassandra backend. See https://github.com/datastax/python-driver
 use the Cassandra backend. See https://github.com/datastax/python-driver
 """
 """
 
 
+E_NO_SUCH_CASSANDRA_AUTH_PROVIDER = """
+CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
+See https://datastax.github.io/python-driver/api/cassandra/auth.html.
+"""
+
 Q_INSERT_RESULT = """
 Q_INSERT_RESULT = """
 INSERT INTO {table} (
 INSERT INTO {table} (
     task_id, status, result, date_done, traceback, children) VALUES (
     task_id, status, result, date_done, traceback, children) VALUES (
@@ -121,6 +127,15 @@ class CassandraBackend(BaseBackend):
             cassandra.ConsistencyLevel.LOCAL_QUORUM,
             cassandra.ConsistencyLevel.LOCAL_QUORUM,
         )
         )
 
 
+        self.auth_provider = None
+        auth_provider = conf.get('cassandra_auth_provider', None)
+        auth_kwargs = conf.get('cassandra_auth_kwargs', None)
+        if auth_provider and auth_kwargs:
+            auth_provider_class = getattr(cassandra.auth, auth_provider, None)
+            if not auth_provider_class:
+                raise ImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER)
+            self.auth_provider = auth_provider_class(**auth_kwargs)
+
         self._connection = None
         self._connection = None
         self._session = None
         self._session = None
         self._write_stmt = None
         self._write_stmt = None
@@ -142,8 +157,9 @@ class CassandraBackend(BaseBackend):
         """
         """
         if self._connection is None:
         if self._connection is None:
             try:
             try:
-                self._connection = cassandra.cluster.Cluster(self.servers,
-                                                             port=self.port)
+                self._connection = cassandra.cluster.Cluster(
+                    self.servers, port=self.port,
+                    auth_provider=self.auth_provider)
                 self._session = self._connection.connect(self.keyspace)
                 self._session = self._connection.connect(self.keyspace)
 
 
                 # We are forced to do concatenation below, as formatting would
                 # We are forced to do concatenation below, as formatting would

+ 28 - 1
celery/tests/backends/test_cassandra.py

@@ -9,7 +9,7 @@ from celery.tests.case import (
     AppCase, Mock, mock_module, depends_on_current_app
     AppCase, Mock, mock_module, depends_on_current_app
 )
 )
 
 
-CASSANDRA_MODULES = ['cassandra', 'cassandra.cluster']
+CASSANDRA_MODULES = ['cassandra', 'cassandra.auth', 'cassandra.cluster']
 
 
 
 
 class Object(object):
 class Object(object):
@@ -168,3 +168,30 @@ class test_CassandraBackend(AppCase):
                 x.process_cleanup()
                 x.process_cleanup()
 
 
             self.assertEquals(RAMHoggingCluster.objects_alive, 0)
             self.assertEquals(RAMHoggingCluster.objects_alive, 0)
+
+    def test_auth_provider(self):
+        """Ensure valid auth_provider works properly, and invalid one raises
+        ImproperlyConfigured exception."""
+        class DummyAuth(object):
+            ValidAuthProvider = Mock()
+
+        with mock_module(*CASSANDRA_MODULES):
+            from celery.backends import cassandra as mod
+
+            mod.cassandra = Mock()
+            mod.cassandra.auth = DummyAuth
+
+            # Valid auth_provider
+            self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'
+            self.app.conf.cassandra_auth_kwargs = {
+                'username': 'stuff'
+            }
+            mod.CassandraBackend(app=self.app)
+
+            # Invalid auth_provider
+            self.app.conf.cassandra_auth_provider = 'SpiderManAuth'
+            self.app.conf.cassandra_auth_kwargs = {
+                'username': 'Jack'
+            }
+            with self.assertRaises(ImproperlyConfigured):
+                mod.CassandraBackend(app=self.app)

+ 20 - 0
docs/configuration.rst

@@ -970,6 +970,26 @@ cassandra_entry_ttl
 Time-to-live for status entries. They will expire and be removed after that many seconds
 Time-to-live for status entries. They will expire and be removed after that many seconds
 after adding. Default (None) means they will never expire.
 after adding. Default (None) means they will never expire.
 
 
+.. setting:: cassandra_auth_provider
+
+cassandra_auth_provider
+~~~~~~~~~~~~~~~~~~~~~~~
+
+AuthProvider class within ``cassandra.auth`` module to use.  Values can be
+``PlainTextAuthProvider`` or ``SaslAuthProvider``.
+
+.. setting:: cassandra_auth_kwargs
+
+cassandra_auth_kwargs
+~~~~~~~~~~~~~~~~~~~~~
+
+Named arguments to pass into the auth provider. e.g.::
+
+    cassandra_auth_kwargs = {
+        username: 'cassandra',
+        password: 'cassandra'
+    }
+
 Example configuration
 Example configuration
 ~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~