Piotr Maślanka 9 năm trước cách đây
mục cha
commit
ff4e1a9e81

+ 3 - 2
celery/backends/new_cassandra.py

@@ -127,11 +127,12 @@ class CassandraBackend(BaseBackend):
         self._read_stmt = None
 
     def process_cleanup(self):
-        if self._connection is not None:
-            self._connection = None
         if self._session is not None:
             self._session.shutdown()
             self._session = None
+        if self._connection is not None:
+            self._connection.shutdown()
+            self._connection = None
 
     def _get_connection(self, write=False):
         """Prepare the connection for action

+ 32 - 0
celery/tests/backends/test_new_cassandra.py

@@ -133,3 +133,35 @@ class test_CassandraBackend(AppCase):
             self.assertIsNone(x._session)
 
             x.process_cleanup()  # should not raise
+
+    def test_please_free_memory(self):
+        """
+        Ensure that Cluster object IS shut down.
+        """
+        with mock_module(*CASSANDRA_MODULES):
+            from celery.backends import new_cassandra as mod
+
+            class RAMHoggingCluster(object):
+
+                objects_alive = 0
+
+                def __init__(self, *args, **kwargs):
+                    pass
+
+                def connect(self, *args, **kwargs):
+                    RAMHoggingCluster.objects_alive += 1
+                    return Mock()
+
+                def shutdown(self):
+                    RAMHoggingCluster.objects_alive -= 1
+
+            mod.cassandra = Mock()
+            mod.cassandra.cluster = Mock()
+            mod.cassandra.cluster.Cluster = RAMHoggingCluster
+
+            for x in xrange(0, 10):
+                x = mod.CassandraBackend(app=self.app)
+                x._store_result('task_id', 'result', states.SUCCESS)
+                x.process_cleanup()
+
+            self.assertEquals(RAMHoggingCluster.objects_alive, 0)