Piotr Maślanka 9 éve
szülő
commit
3d36b78eb4

+ 43 - 33
celery/backends/new_cassandra.py

@@ -140,40 +140,50 @@ class CassandraBackend(BaseBackend):
 
         """
         if self._connection is None:
-            self._connection = cassandra.cluster.Cluster(self.servers,
-                                                         port=self.port)
-            self._session = self._connection.connect(self.keyspace)
-
-            # We are forced to do concatenation below, as formatting would
-            # blow up on superficial %s that will be processed by Cassandra
-            self._write_stmt = cassandra.query.SimpleStatement(
-                Q_INSERT_RESULT.format(
-                    table=self.table, expires=self.cqlexpires),
-            )
-            self._write_stmt.consistency_level = self.write_consistency
-
-            self._read_stmt = cassandra.query.SimpleStatement(
-                Q_SELECT_RESULT.format(table=self.table),
-            )
-            self._read_stmt.consistency_level = self.read_consistency
-
-            if write:
-                # Only possible writers "workers" are allowed to issue
-                # CREATE TABLE. This is to prevent conflicting situations
-                # where both task-creator and task-executor would issue it
-                # at the same time.
-
-                # Anyway, if you are doing anything critical, you should
-                # have probably created this table in advance, in which case
-                # this query will be a no-op (instant fail with AlreadyExists)
-                self._make_stmt = cassandra.query.SimpleStatement(
-                    Q_CREATE_RESULT_TABLE.format(table=self.table),
+            try:
+                self._connection = cassandra.cluster.Cluster(self.servers,
+                                                             port=self.port)
+                self._session = self._connection.connect(self.keyspace)
+
+                # We are forced to do concatenation below, as formatting would
+                # blow up on superficial %s that will be processed by Cassandra
+                self._write_stmt = cassandra.query.SimpleStatement(
+                    Q_INSERT_RESULT.format(
+                        table=self.table, expires=self.cqlexpires),
                 )
-                self._make_stmt.consistency_level = self.write_consistency
-                try:
-                    self._session.execute(self._make_stmt)
-                except cassandra.AlreadyExists:
-                    pass
+                self._write_stmt.consistency_level = self.write_consistency
+
+                self._read_stmt = cassandra.query.SimpleStatement(
+                    Q_SELECT_RESULT.format(table=self.table),
+                )
+                self._read_stmt.consistency_level = self.read_consistency
+
+                if write:
+                    # Only possible writers "workers" are allowed to issue
+                    # CREATE TABLE. This is to prevent conflicting situations
+                    # where both task-creator and task-executor would issue it
+                    # at the same time.
+
+                    # Anyway, if you are doing anything critical, you should
+                    # have probably created this table in advance, in which case
+                    # this query will be a no-op (instant fail with AlreadyExists)
+                    self._make_stmt = cassandra.query.SimpleStatement(
+                        Q_CREATE_RESULT_TABLE.format(table=self.table),
+                    )
+                    self._make_stmt.consistency_level = self.write_consistency
+                    try:
+                        self._session.execute(self._make_stmt)
+                    except cassandra.AlreadyExists:
+                        pass
+
+            except cassandra.OperationTimedOut:
+                # a heavily loaded or gone Cassandra cluster failed to respond.
+                # leave this class in a consistent state
+                self._connection = None
+                if self._session is not None:
+                    self._session.shutdown()
+
+                raise   # we did fail after all - reraise
 
     def _store_result(self, task_id, result, status,
                       traceback=None, request=None, **kwargs):

+ 28 - 0
celery/tests/backends/test_new_cassandra.py

@@ -102,3 +102,31 @@ class test_CassandraBackend(AppCase):
 
             self.assertIsNone(x._connection)
             self.assertIsNone(x._session)
+
+    def test_timeouting_cluster(self):
+        """
+        Tests behaviour when Cluster.connect raises cassandra.OperationTimedOut
+        """
+        with mock_module(*CASSANDRA_MODULES):
+            from celery.backends import new_cassandra as mod
+
+            class OTOExc(Exception):
+                pass
+
+            class VeryFaultyCluster(object):
+                def __init__(self, *args, **kwargs):
+                    pass
+
+                def connect(self, *args, **kwargs):
+                    raise OTOExc()
+
+            mod.cassandra = Mock()
+            mod.cassandra.OperationTimedOut = OTOExc
+            mod.cassandra.cluster = Mock()
+            mod.cassandra.cluster.Cluster = VeryFaultyCluster
+
+            x = mod.CassandraBackend(app=self.app)
+
+            self.assertRaises(OTOExc, lambda: x._store_result('task_id', 'result', states.SUCCESS))
+            self.assertIsNone(x._connection)
+            self.assertIsNone(x._session)