Browse Source

Adds functional test testing for memory leaks while publishing tasks

Ask Solem 14 years ago
parent
commit
fb31038500
1 changed files with 131 additions and 0 deletions
  1. 131 0
      funtests/suite/test_leak.py

+ 131 - 0
funtests/suite/test_leak.py

@@ -0,0 +1,131 @@
+import gc
+import os
+import sys
+import shlex
+import subprocess
+
+sys.path.insert(0, os.getcwd())
+sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
+
+from nose import SkipTest
+
+from celery import current_app
+from celery.tests.utils import unittest
+
+import suite
+
+GET_RSIZE = "/bin/ps -p %(pid)s -o rss="
+QUICKTEST = int(os.environ.get("QUICKTEST", 0))
+
+
+class Sizes(list):
+
+    def add(self, item):
+        if item not in self:
+            self.append(item)
+
+    def average(self):
+        return sum(self) / len(self)
+
+
+class LeakFunCase(unittest.TestCase):
+
+    def setUp(self):
+        self.app = current_app
+        self.debug = os.environ.get("TEST_LEAK_DEBUG", False)
+
+    def get_rsize(self, cmd=GET_RSIZE):
+        try:
+            return int(subprocess.Popen(
+                        shlex.split(cmd % {"pid": os.getpid()}),
+                            stdout=subprocess.PIPE).communicate()[0].strip())
+        except OSError, exc:
+            raise SkipTest("Can't execute command: %r: %r" % (cmd, exc))
+
+    def sample_allocated(self, fun, *args, **kwargs):
+        before = self.get_rsize()
+
+        fun(*args, **kwargs)
+        gc.collect()
+        after = self.get_rsize()
+        return before, after
+
+    def appx(self, s, r=1):
+        """r==1 (10e1): Keep up to hundred kB,
+        e.g. 16,268MB becomes 16,2MB."""
+        return int(s / 10.0 ** (r + 1)) / 10.0
+
+    def assertFreed(self, n, fun, *args, **kwargs):
+        # call function first to load lazy modules etc.
+        fun(*args, **kwargs)
+
+        try:
+            base = self.get_rsize()
+            first = None
+            sizes = Sizes()
+            for i in xrange(n):
+                before, after = self.sample_allocated(fun, *args, **kwargs)
+                if not first:
+                    first = after
+                if self.debug:
+                    print("%r %s: before/after: %s/%s" % (
+                            fun, i, before, after))
+                else:
+                    sys.stderr.write(".")
+                sizes.add(self.appx(after))
+            self.assertEqual(gc.collect(), 0)
+            self.assertEqual(gc.garbage, [])
+            try:
+                assert self.appx(first) >= self.appx(after)
+            except AssertionError:
+                print("BASE: %r AVG: %r SIZES: %r" % (
+                    base, sizes.average(), sizes, ))
+                raise
+        finally:
+            self.app.control.discard_all()
+
+
+class test_leaks(LeakFunCase):
+
+    def test_task_apply_leak(self):
+        its = QUICKTEST and 10 or 1000
+        self.assertNotEqual(self.app.conf.BROKER_BACKEND, "memory")
+
+        @self.app.task
+        def task1():
+            pass
+
+        try:
+            pool_limit = self.app.conf.BROKER_POOL_LIMIT
+        except AttributeError:
+            return self.assertFreed(self.iterations, foo.delay)
+
+        self.app.conf.BROKER_POOL_LIMIT = None
+        try:
+            self.app._pool = None
+            self.assertFreed(its, task1.delay)
+        finally:
+            self.app.conf.BROKER_POOL_LIMIT = pool_limit
+
+    def test_task_apply_leak_with_pool(self):
+        its = QUICKTEST and 10 or 1000
+        self.assertNotEqual(self.app.conf.BROKER_BACKEND, "memory")
+
+        @self.app.task
+        def task2():
+            pass
+
+        try:
+            pool_limit = self.app.conf.BROKER_POOL_LIMIT
+        except AttributeError:
+            raise SkipTest("This version does not support autopool")
+
+        self.app.conf.BROKER_POOL_LIMIT = 10
+        try:
+            self.app._pool = None
+            self.assertFreed(its, task2.delay)
+        finally:
+            self.app.conf.BROKER_POOL_LIMIT = pool_limit
+
+if __name__ == "__main__":
+    unittest.main()