| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 | from __future__ import print_function, unicode_literalsimport gcimport osimport sysimport shleximport subprocessfrom celery import current_appfrom celery.five import rangefrom celery.tests.case import SkipTest, unittestimport suite  # noqaGET_RSIZE = b'/bin/ps -p {pid} -o rss='class Sizes(list):    def add(self, item):        if item not in self:            self.append(item)    def average(self):        return sum(self) / len(self)class LeakFunCase(unittest.TestCase):    def setUp(self):        self.app = current_app        self.debug = os.environ.get('TEST_LEAK_DEBUG', False)    def get_rsize(self, cmd=GET_RSIZE):        try:            return int(subprocess.Popen(                shlex.split(cmd.format(pid=os.getpid())),                stdout=subprocess.PIPE).communicate()[0].strip()            )        except OSError as exc:            raise SkipTest(                'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))    def sample_allocated(self, fun, *args, **kwargs):        before = self.get_rsize()        fun(*args, **kwargs)        gc.collect()        after = self.get_rsize()        return before, after    def appx(self, s, r=1):        """r==1 (10e1): Keep up to hundred kB,        e.g. 16,268MB becomes 16,2MB."""        return int(s / 10.0 ** (r + 1)) / 10.0    def assertFreed(self, n, fun, *args, **kwargs):        # call function first to load lazy modules etc.        fun(*args, **kwargs)        try:            base = self.get_rsize()            first = None            sizes = Sizes()            for i in range(n):                before, after = self.sample_allocated(fun, *args, **kwargs)                if not first:                    first = after                if self.debug:                    print('{0!r} {1}: before/after: {2}/{3}'.format(                          fun, i, before, after))                else:                    sys.stderr.write('.')                sizes.add(self.appx(after))            self.assertEqual(gc.collect(), 0)            self.assertEqual(gc.garbage, [])            try:                assert self.appx(first) >= self.appx(after)            except AssertionError:                print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(                    base, sizes.average(), sizes))                raise        finally:            self.app.control.purge()class test_leaks(LeakFunCase):    def test_task_apply_leak(self, its=1000):        self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')        @self.app.task        def task1():            pass        try:            pool_limit = self.app.conf.BROKER_POOL_LIMIT        except AttributeError:            return self.assertFreed(self.iterations, task1.delay)        self.app.conf.BROKER_POOL_LIMIT = None        try:            self.app._pool = None            self.assertFreed(its, task1.delay)        finally:            self.app.conf.BROKER_POOL_LIMIT = pool_limit    def test_task_apply_leak_with_pool(self, its=1000):        self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')        @self.app.task        def task2():            pass        try:            pool_limit = self.app.conf.BROKER_POOL_LIMIT        except AttributeError:            raise SkipTest('This version does not support autopool')        self.app.conf.BROKER_POOL_LIMIT = 10        try:            self.app._pool = None            self.assertFreed(its, task2.delay)        finally:            self.app.conf.BROKER_POOL_LIMIT = pool_limitif __name__ == '__main__':    unittest.main()
 |