| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 | 
							- from __future__ import print_function, unicode_literals
 
- import gc
 
- import os
 
- import sys
 
- import shlex
 
- import subprocess
 
- sys.path.insert(0, os.getcwd())
 
- sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
 
- from celery import current_app
 
- from celery.five import range
 
- from celery.tests.case import SkipTest, unittest
 
- import suite  # noqa
 
- GET_RSIZE = b'/bin/ps -p {pid} -o rss='
 
- class Sizes(list):
 
-     def add(self, item):
 
-         if item not in self:
 
-             self.append(item)
 
-     def average(self):
 
-         return sum(self) / len(self)
 
- class LeakFunCase(unittest.TestCase):
 
-     def setUp(self):
 
-         self.app = current_app
 
-         self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
 
-     def get_rsize(self, cmd=GET_RSIZE):
 
-         try:
 
-             return int(subprocess.Popen(
 
-                 shlex.split(cmd.format(pid=os.getpid())),
 
-                 stdout=subprocess.PIPE).communicate()[0].strip()
 
-             )
 
-         except OSError as exc:
 
-             raise SkipTest(
 
-                 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
 
-     def sample_allocated(self, fun, *args, **kwargs):
 
-         before = self.get_rsize()
 
-         fun(*args, **kwargs)
 
-         gc.collect()
 
-         after = self.get_rsize()
 
-         return before, after
 
-     def appx(self, s, r=1):
 
-         """r==1 (10e1): Keep up to hundred kB,
 
-         e.g. 16,268MB becomes 16,2MB."""
 
-         return int(s / 10.0 ** (r + 1)) / 10.0
 
-     def assertFreed(self, n, fun, *args, **kwargs):
 
-         # call function first to load lazy modules etc.
 
-         fun(*args, **kwargs)
 
-         try:
 
-             base = self.get_rsize()
 
-             first = None
 
-             sizes = Sizes()
 
-             for i in range(n):
 
-                 before, after = self.sample_allocated(fun, *args, **kwargs)
 
-                 if not first:
 
-                     first = after
 
-                 if self.debug:
 
-                     print('{0!r} {1}: before/after: {2}/{3}'.format(
 
-                           fun, i, before, after))
 
-                 else:
 
-                     sys.stderr.write('.')
 
-                 sizes.add(self.appx(after))
 
-             self.assertEqual(gc.collect(), 0)
 
-             self.assertEqual(gc.garbage, [])
 
-             try:
 
-                 assert self.appx(first) >= self.appx(after)
 
-             except AssertionError:
 
-                 print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(
 
-                     base, sizes.average(), sizes))
 
-                 raise
 
-         finally:
 
-             self.app.control.purge()
 
- class test_leaks(LeakFunCase):
 
-     def test_task_apply_leak(self, its=1000):
 
-         self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
 
-         @self.app.task
 
-         def task1():
 
-             pass
 
-         try:
 
-             pool_limit = self.app.conf.BROKER_POOL_LIMIT
 
-         except AttributeError:
 
-             return self.assertFreed(self.iterations, task1.delay)
 
-         self.app.conf.BROKER_POOL_LIMIT = None
 
-         try:
 
-             self.app._pool = None
 
-             self.assertFreed(its, task1.delay)
 
-         finally:
 
-             self.app.conf.BROKER_POOL_LIMIT = pool_limit
 
-     def test_task_apply_leak_with_pool(self, its=1000):
 
-         self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
 
-         @self.app.task
 
-         def task2():
 
-             pass
 
-         try:
 
-             pool_limit = self.app.conf.BROKER_POOL_LIMIT
 
-         except AttributeError:
 
-             raise SkipTest('This version does not support autopool')
 
-         self.app.conf.BROKER_POOL_LIMIT = 10
 
-         try:
 
-             self.app._pool = None
 
-             self.assertFreed(its, task2.delay)
 
-         finally:
 
-             self.app.conf.BROKER_POOL_LIMIT = pool_limit
 
- if __name__ == '__main__':
 
-     unittest.main()
 
 
  |