123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- from __future__ import print_function
- import gc
- import os
- import sys
- import shlex
- import subprocess
- sys.path.insert(0, os.getcwd())
- sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
- from celery import current_app
- from celery.five import range
- from celery.tests.case import SkipTest, unittest
- import suite # noqa
- GET_RSIZE = '/bin/ps -p {pid} -o rss='
- class Sizes(list):
- def add(self, item):
- if item not in self:
- self.append(item)
- def average(self):
- return sum(self) / len(self)
- class LeakFunCase(unittest.TestCase):
- def setUp(self):
- self.app = current_app
- self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
- def get_rsize(self, cmd=GET_RSIZE):
- try:
- return int(subprocess.Popen(
- shlex.split(cmd.format(pid=os.getpid())),
- stdout=subprocess.PIPE).communicate()[0].strip()
- )
- except OSError as exc:
- raise SkipTest(
- 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
- def sample_allocated(self, fun, *args, **kwargs):
- before = self.get_rsize()
- fun(*args, **kwargs)
- gc.collect()
- after = self.get_rsize()
- return before, after
- def appx(self, s, r=1):
- """r==1 (10e1): Keep up to hundred kB,
- e.g. 16,268MB becomes 16,2MB."""
- return int(s / 10.0 ** (r + 1)) / 10.0
- def assertFreed(self, n, fun, *args, **kwargs):
- # call function first to load lazy modules etc.
- fun(*args, **kwargs)
- try:
- base = self.get_rsize()
- first = None
- sizes = Sizes()
- for i in range(n):
- before, after = self.sample_allocated(fun, *args, **kwargs)
- if not first:
- first = after
- if self.debug:
- print('{0!r} {1}: before/after: {2}/{3}'.format(
- fun, i, before, after))
- else:
- sys.stderr.write('.')
- sizes.add(self.appx(after))
- self.assertEqual(gc.collect(), 0)
- self.assertEqual(gc.garbage, [])
- try:
- assert self.appx(first) >= self.appx(after)
- except AssertionError:
- print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(
- base, sizes.average(), sizes))
- raise
- finally:
- self.app.control.purge()
- class test_leaks(LeakFunCase):
- def test_task_apply_leak(self, its=1000):
- self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
- @self.app.task
- def task1():
- pass
- try:
- pool_limit = self.app.conf.BROKER_POOL_LIMIT
- except AttributeError:
- return self.assertFreed(self.iterations, task1.delay)
- self.app.conf.BROKER_POOL_LIMIT = None
- try:
- self.app._pool = None
- self.assertFreed(its, task1.delay)
- finally:
- self.app.conf.BROKER_POOL_LIMIT = pool_limit
- def test_task_apply_leak_with_pool(self, its=1000):
- self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
- @self.app.task
- def task2():
- pass
- try:
- pool_limit = self.app.conf.BROKER_POOL_LIMIT
- except AttributeError:
- raise SkipTest('This version does not support autopool')
- self.app.conf.BROKER_POOL_LIMIT = 10
- try:
- self.app._pool = None
- self.assertFreed(its, task2.delay)
- finally:
- self.app.conf.BROKER_POOL_LIMIT = pool_limit
- if __name__ == '__main__':
- unittest.main()
|