Procházet zdrojové kódy

Merge branch 'master' of github.com:celery/celery

Ask Solem před 9 roky
rodič
revize
5a75d5980d

+ 10 - 7
celery/backends/cassandra.py

@@ -125,13 +125,14 @@ class CassandraBackend(BaseBackend):
         self._session = None
         self._write_stmt = None
         self._read_stmt = None
+        self._make_stmt = None
 
     def process_cleanup(self):
         if self._connection is not None:
-            self._connection = None
-        if self._session is not None:
-            self._session.shutdown()
-            self._session = None
+            self._connection.shutdown() # also shuts down _session
+
+        self._connection = None
+        self._session = None
 
     def _get_connection(self, write=False):
         """Prepare the connection for action
@@ -171,6 +172,7 @@ class CassandraBackend(BaseBackend):
                         Q_CREATE_RESULT_TABLE.format(table=self.table),
                     )
                     self._make_stmt.consistency_level = self.write_consistency
+
                     try:
                         self._session.execute(self._make_stmt)
                     except cassandra.AlreadyExists:
@@ -179,10 +181,11 @@ class CassandraBackend(BaseBackend):
             except cassandra.OperationTimedOut:
                 # a heavily loaded or gone Cassandra cluster failed to respond.
                 # leave this class in a consistent state
-                self._connection = None
-                if self._session is not None:
-                    self._session.shutdown()
+                if self._connection is not None:
+                    self._connection.shutdown()     # also shuts down _session
 
+                self._connection = None
+                self._session = None
                 raise   # we did fail after all - reraise
 
     def _store_result(self, task_id, result, status,

+ 39 - 0
celery/tests/backends/test_cassandra.py

@@ -41,6 +41,7 @@ class test_CassandraBackend(AppCase):
         with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
             mod.cassandra = Mock()
+
             cons = mod.cassandra.ConsistencyLevel = Object()
             cons.LOCAL_QUORUM = 'foo'
 
@@ -68,6 +69,7 @@ class test_CassandraBackend(AppCase):
         with mock_module(*CASSANDRA_MODULES):
             from celery.backends import cassandra as mod
             mod.cassandra = Mock()
+
             x = mod.CassandraBackend(app=self.app)
             x._connection = True
             session = x._session = Mock()
@@ -120,6 +122,9 @@ class test_CassandraBackend(AppCase):
                 def connect(self, *args, **kwargs):
                     raise OTOExc()
 
+                def shutdown(self):
+                    pass
+
             mod.cassandra = Mock()
             mod.cassandra.OperationTimedOut = OTOExc
             mod.cassandra.cluster = Mock()
@@ -133,3 +138,37 @@ class test_CassandraBackend(AppCase):
             self.assertIsNone(x._session)
 
             x.process_cleanup()  # should not raise
+
+
+    def test_please_free_memory(self):
+        """
+        Ensure that Cluster object IS shut down.
+        """
+        with mock_module(*CASSANDRA_MODULES):
+            from celery.backends import cassandra as mod
+
+            class RAMHoggingCluster(object):
+
+                objects_alive = 0
+
+                def __init__(self, *args, **kwargs):
+                    pass
+
+                def connect(self, *args, **kwargs):
+                    RAMHoggingCluster.objects_alive += 1
+                    return Mock()
+
+                def shutdown(self):
+                    RAMHoggingCluster.objects_alive -= 1
+
+            mod.cassandra = Mock()
+
+            mod.cassandra.cluster = Mock()
+            mod.cassandra.cluster.Cluster = RAMHoggingCluster
+
+            for x in range(0, 10):
+                x = mod.CassandraBackend(app=self.app)
+                x._store_result('task_id', 'result', states.SUCCESS)
+                x.process_cleanup()
+
+            self.assertEquals(RAMHoggingCluster.objects_alive, 0)