瀏覽代碼

Merge branch 'master' of github.com:celery/celery

Ask Solem 9 年之前
父節點
當前提交
f86ed7b968

+ 0 - 1
.coveragerc

@@ -16,7 +16,6 @@ omit =
     *celery/contrib/sphinx.py
     *celery/backends/couchdb.py
     *celery/backends/couchbase.py
-    *celery/backends/cassandra.py
     *celery/backends/riak.py
     *celery/concurrency/asynpool.py
     *celery/utils/debug.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -203,3 +203,4 @@ Chris Harris, 2015/11/27
 Valentyn Klindukh, 2016/01/15
 Wayne Chang, 2016/01/15
 Mike Attwood, 2016/01/22
+David Harrigan, 2016/02/01

+ 18 - 2
celery/backends/cassandra.py

@@ -11,6 +11,7 @@ from __future__ import absolute_import
 import sys
 try:  # pragma: no cover
     import cassandra
+    import cassandra.auth
     import cassandra.cluster
 except ImportError:  # pragma: no cover
     cassandra = None   # noqa
@@ -29,6 +30,11 @@ You need to install the cassandra-driver library to
 use the Cassandra backend. See https://github.com/datastax/python-driver
 """
 
+E_NO_SUCH_CASSANDRA_AUTH_PROVIDER = """
+CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
+See https://datastax.github.io/python-driver/api/cassandra/auth.html.
+"""
+
 Q_INSERT_RESULT = """
 INSERT INTO {table} (
     task_id, status, result, date_done, traceback, children) VALUES (
@@ -121,6 +127,15 @@ class CassandraBackend(BaseBackend):
             cassandra.ConsistencyLevel.LOCAL_QUORUM,
         )
 
+        self.auth_provider = None
+        auth_provider = conf.get('cassandra_auth_provider', None)
+        auth_kwargs = conf.get('cassandra_auth_kwargs', None)
+        if auth_provider and auth_kwargs:
+            auth_provider_class = getattr(cassandra.auth, auth_provider, None)
+            if not auth_provider_class:
+                raise ImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER)
+            self.auth_provider = auth_provider_class(**auth_kwargs)
+
         self._connection = None
         self._session = None
         self._write_stmt = None
@@ -142,8 +157,9 @@ class CassandraBackend(BaseBackend):
         """
         if self._connection is None:
             try:
-                self._connection = cassandra.cluster.Cluster(self.servers,
-                                                             port=self.port)
+                self._connection = cassandra.cluster.Cluster(
+                    self.servers, port=self.port,
+                    auth_provider=self.auth_provider)
                 self._session = self._connection.connect(self.keyspace)
 
                 # We are forced to do concatenation below, as formatting would

+ 28 - 1
celery/tests/backends/test_cassandra.py

@@ -9,7 +9,7 @@ from celery.tests.case import (
     AppCase, Mock, mock_module, depends_on_current_app
 )
 
-CASSANDRA_MODULES = ['cassandra', 'cassandra.cluster']
+CASSANDRA_MODULES = ['cassandra', 'cassandra.auth', 'cassandra.cluster']
 
 
 class Object(object):
@@ -168,3 +168,30 @@ class test_CassandraBackend(AppCase):
                 x.process_cleanup()
 
             self.assertEquals(RAMHoggingCluster.objects_alive, 0)
+
+    def test_auth_provider(self):
+        """Ensure valid auth_provider works properly, and invalid one raises
+        ImproperlyConfigured exception."""
+        class DummyAuth(object):
+            ValidAuthProvider = Mock()
+
+        with mock_module(*CASSANDRA_MODULES):
+            from celery.backends import cassandra as mod
+
+            mod.cassandra = Mock()
+            mod.cassandra.auth = DummyAuth
+
+            # Valid auth_provider
+            self.app.conf.cassandra_auth_provider = 'ValidAuthProvider'
+            self.app.conf.cassandra_auth_kwargs = {
+                'username': 'stuff'
+            }
+            mod.CassandraBackend(app=self.app)
+
+            # Invalid auth_provider
+            self.app.conf.cassandra_auth_provider = 'SpiderManAuth'
+            self.app.conf.cassandra_auth_kwargs = {
+                'username': 'Jack'
+            }
+            with self.assertRaises(ImproperlyConfigured):
+                mod.CassandraBackend(app=self.app)

+ 20 - 0
docs/configuration.rst

@@ -970,6 +970,26 @@ cassandra_entry_ttl
 Time-to-live for status entries. They will expire and be removed after that many seconds
 after adding. Default (None) means they will never expire.
 
+.. setting:: cassandra_auth_provider
+
+cassandra_auth_provider
+~~~~~~~~~~~~~~~~~~~~~~~
+
+AuthProvider class within ``cassandra.auth`` module to use.  Values can be
+``PlainTextAuthProvider`` or ``SaslAuthProvider``.
+
+.. setting:: cassandra_auth_kwargs
+
+cassandra_auth_kwargs
+~~~~~~~~~~~~~~~~~~~~~
+
+Named arguments to pass into the auth provider. e.g.::
+
+    cassandra_auth_kwargs = {
+        username: 'cassandra',
+        password: 'cassandra'
+    }
+
 Example configuration
 ~~~~~~~~~~~~~~~~~~~~~
 

+ 1 - 1
docs/internals/deprecation.rst

@@ -156,7 +156,7 @@ Task_sent signal
 ----------------
 
 The :signal:`task_sent` signal will be removed in version 4.0.
-Please use the :signal:`before_task_publish` and :signal:`after_task_publush`
+Please use the :signal:`before_task_publish` and :signal:`after_task_publish`
 signals instead.
 
 Result

+ 0 - 6
docs/tutorials/daemonizing.rst

@@ -424,9 +424,3 @@ Windows
 See this excellent external tutorial:
 
 http://www.calazan.com/windows-tip-run-applications-in-the-background-using-task-scheduler/
-
-CentOS
-======
-In CentOS we can take advantage of built-in service helpers, such as the
-pid-based status checker function in ``/etc/init.d/functions``.
-See the sample script in http://github.com/celery/celery/tree/3.1/extra/centos/.

+ 1 - 1
examples/eventlet/bulk_task_producer.py

@@ -10,7 +10,7 @@ class Receipt(object):
     result = None
 
     def __init__(self, callback=None):
-        self.callback = None
+        self.callback = callback
         self.ready = Event()
 
     def finished(self, result):