Piotr Maślanka 9 vuotta sitten
vanhempi
commit
0e4890cfa4
2 muutettua tiedostoa jossa 17 lisäystä ja 8 poistoa
  1. 10 8
      celery/backends/new_cassandra.py
  2. 7 0
      celery/tests/backends/test_new_cassandra.py

+ 10 - 8
celery/backends/new_cassandra.py

@@ -125,14 +125,14 @@ class CassandraBackend(BaseBackend):
         self._session = None
         self._write_stmt = None
         self._read_stmt = None
+        self._make_stmt = None
 
     def process_cleanup(self):
-        if self._session is not None:
-            self._session.shutdown()
-            self._session = None
         if self._connection is not None:
-            self._connection.shutdown()
-            self._connection = None
+            self._connection.shutdown() # also shuts down _session
+
+        self._connection = None
+        self._session = None
 
     def _get_connection(self, write=False):
         """Prepare the connection for action
@@ -172,6 +172,7 @@ class CassandraBackend(BaseBackend):
                         Q_CREATE_RESULT_TABLE.format(table=self.table),
                     )
                     self._make_stmt.consistency_level = self.write_consistency
+
                     try:
                         self._session.execute(self._make_stmt)
                     except cassandra.AlreadyExists:
@@ -180,10 +181,11 @@ class CassandraBackend(BaseBackend):
             except cassandra.OperationTimedOut:
                 # a heavily loaded or gone Cassandra cluster failed to respond.
                 # leave this class in a consistent state
-                self._connection = None
-                if self._session is not None:
-                    self._session.shutdown()
+                if self._connection is not None:
+                    self._connection.shutdown()     # also shuts down _session
 
+                self._connection = None
+                self._session = None
                 raise   # we did fail after all - reraise
 
     def _store_result(self, task_id, result, status,

+ 7 - 0
celery/tests/backends/test_new_cassandra.py

@@ -41,6 +41,7 @@ class test_CassandraBackend(AppCase):
         with mock_module(*CASSANDRA_MODULES):
             from celery.backends import new_cassandra as mod
             mod.cassandra = Mock()
+
             cons = mod.cassandra.ConsistencyLevel = Object()
             cons.LOCAL_QUORUM = 'foo'
 
@@ -68,6 +69,7 @@ class test_CassandraBackend(AppCase):
         with mock_module(*CASSANDRA_MODULES):
             from celery.backends import new_cassandra as mod
             mod.cassandra = Mock()
+
             x = mod.CassandraBackend(app=self.app)
             x._connection = True
             session = x._session = Mock()
@@ -120,6 +122,9 @@ class test_CassandraBackend(AppCase):
                 def connect(self, *args, **kwargs):
                     raise OTOExc()
 
+                def shutdown(self):
+                    pass
+
             mod.cassandra = Mock()
             mod.cassandra.OperationTimedOut = OTOExc
             mod.cassandra.cluster = Mock()
@@ -134,6 +139,7 @@ class test_CassandraBackend(AppCase):
 
             x.process_cleanup()  # should not raise
 
+
     def test_please_free_memory(self):
         """
         Ensure that Cluster object IS shut down.
@@ -156,6 +162,7 @@ class test_CassandraBackend(AppCase):
                     RAMHoggingCluster.objects_alive -= 1
 
             mod.cassandra = Mock()
+
             mod.cassandra.cluster = Mock()
             mod.cassandra.cluster.Cluster = RAMHoggingCluster