Forráskód Böngészése

Merge branch 'jterrace/master'

Ask Solem 14 éve
szülő
commit
ac56cac22c
1 módosított fájl, 21 hozzáadás és 9 törlés
  1. 21 9
      celery/backends/cassandra.py

+ 21 - 9
celery/backends/cassandra.py

@@ -2,7 +2,7 @@
 try:
     import pycassa
     from thrift import Thrift
-    C = __import__('cassandra').ttypes          # FIXME Namespace kludge
+    C = pycassa.cassandra.ttypes
 except ImportError:
     pycassa = None
 
@@ -59,7 +59,7 @@ class CassandraBackend(BaseDictBackend):
         if not pycassa:
             raise ImproperlyConfigured(
                     "You need to install the pycassa library to use the "
-                    "Cassandra backend. See http://github.com/vomjom/pycassa")
+                    "Cassandra backend. See https://github.com/pycassa/pycassa")
 
         self.servers = servers or \
                         self.app.conf.get("CASSANDRA_SERVERS", self.servers)
@@ -72,6 +72,20 @@ class CassandraBackend(BaseDictBackend):
         self.cassandra_options = dict(cassandra_options or {},
                                    **self.app.conf.get("CASSANDRA_OPTIONS",
                                                        {}))
+        read_cons = self.app.conf.get("CASSANDRA_READ_CONSISTENCY",
+                                      "LOCAL_QUORUM")
+        write_cons = self.app.conf.get("CASSANDRA_WRITE_CONSISTENCY",
+                                       "LOCAL_QUORUM")
+        try:
+            self.read_consistency = getattr(pycassa.ConsistencyLevel, read_cons)
+        except AttributeError:
+            self.read_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
+        try:
+            self.write_consistency = getattr(pycassa.ConsistencyLevel,
+                                             write_cons)
+        except AttributeError:
+            self.write_consistency = pycassa.ConsistencyLevel.LOCAL_QUORUM
+
         if not self.servers or not self.keyspace or not self.column_family:
             raise ImproperlyConfigured(
                     "Cassandra backend not configured.")
@@ -86,7 +100,6 @@ class CassandraBackend(BaseDictBackend):
                 try:
                     return func(*args, **kwargs)
                 except (pycassa.InvalidRequestException,
-                        pycassa.NoServerAvailable,
                         pycassa.TimedOutException,
                         pycassa.UnavailableException,
                         socket.error,
@@ -100,13 +113,12 @@ class CassandraBackend(BaseDictBackend):
 
     def _get_column_family(self):
         if self._column_family is None:
-            conn = pycassa.connect(self.servers,
+            conn = pycassa.connect(self.keyspace, servers=self.servers,
                                    **self.cassandra_options)
             self._column_family = \
-              pycassa.ColumnFamily(conn, self.keyspace,
-                    self.column_family,
-                    read_consistency_level=pycassa.ConsistencyLevel.DCQUORUM,
-                    write_consistency_level=pycassa.ConsistencyLevel.DCQUORUM)
+              pycassa.ColumnFamily(conn, self.column_family,
+                    read_consistency_level=self.read_consistency,
+                    write_consistency_level=self.write_consistency)
         return self._column_family
 
     def process_cleanup(self):
@@ -159,7 +171,7 @@ class CassandraBackend(BaseDictBackend):
                                                      count=2 ** 30))
         columns = cf.client.multiget_slice(cf.keyspace, self._index_keys,
                                            column_parent, slice_pred,
-                                           pycassa.ConsistencyLevel.DCQUORUM)
+                                           self.read_consistency)
 
         index_cols = [c.column.name
                         for c in itertools.chain(*columns.values())]