test_leak.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from __future__ import print_function
  2. import gc
  3. import os
  4. import sys
  5. import shlex
  6. import subprocess
  7. sys.path.insert(0, os.getcwd())
  8. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  9. from nose import SkipTest
  10. from celery import current_app
  11. from celery.five import range
  12. from celery.tests.utils import unittest
  13. import suite # noqa
  14. GET_RSIZE = '/bin/ps -p {pid} -o rss='
  15. class Sizes(list):
  16. def add(self, item):
  17. if item not in self:
  18. self.append(item)
  19. def average(self):
  20. return sum(self) / len(self)
  21. class LeakFunCase(unittest.TestCase):
  22. def setUp(self):
  23. self.app = current_app
  24. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  25. def get_rsize(self, cmd=GET_RSIZE):
  26. try:
  27. return int(subprocess.Popen(
  28. shlex.split(cmd.format(pid=os.getpid())),
  29. stdout=subprocess.PIPE).communicate()[0].strip()
  30. )
  31. except OSError as exc:
  32. raise SkipTest(
  33. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  34. def sample_allocated(self, fun, *args, **kwargs):
  35. before = self.get_rsize()
  36. fun(*args, **kwargs)
  37. gc.collect()
  38. after = self.get_rsize()
  39. return before, after
  40. def appx(self, s, r=1):
  41. """r==1 (10e1): Keep up to hundred kB,
  42. e.g. 16,268MB becomes 16,2MB."""
  43. return int(s / 10.0 ** (r + 1)) / 10.0
  44. def assertFreed(self, n, fun, *args, **kwargs):
  45. # call function first to load lazy modules etc.
  46. fun(*args, **kwargs)
  47. try:
  48. base = self.get_rsize()
  49. first = None
  50. sizes = Sizes()
  51. for i in range(n):
  52. before, after = self.sample_allocated(fun, *args, **kwargs)
  53. if not first:
  54. first = after
  55. if self.debug:
  56. print('{0!r} {1}: before/after: {2}/{3}'.format(
  57. fun, i, before, after))
  58. else:
  59. sys.stderr.write('.')
  60. sizes.add(self.appx(after))
  61. self.assertEqual(gc.collect(), 0)
  62. self.assertEqual(gc.garbage, [])
  63. try:
  64. assert self.appx(first) >= self.appx(after)
  65. except AssertionError:
  66. print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(
  67. base, sizes.average(), sizes))
  68. raise
  69. finally:
  70. self.app.control.purge()
  71. class test_leaks(LeakFunCase):
  72. def test_task_apply_leak(self, its=1000):
  73. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  74. @self.app.task
  75. def task1():
  76. pass
  77. try:
  78. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  79. except AttributeError:
  80. return self.assertFreed(self.iterations, task1.delay)
  81. self.app.conf.BROKER_POOL_LIMIT = None
  82. try:
  83. self.app._pool = None
  84. self.assertFreed(its, task1.delay)
  85. finally:
  86. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  87. def test_task_apply_leak_with_pool(self, its=1000):
  88. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  89. @self.app.task
  90. def task2():
  91. pass
  92. try:
  93. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  94. except AttributeError:
  95. raise SkipTest('This version does not support autopool')
  96. self.app.conf.BROKER_POOL_LIMIT = 10
  97. try:
  98. self.app._pool = None
  99. self.assertFreed(its, task2.delay)
  100. finally:
  101. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  102. if __name__ == '__main__':
  103. unittest.main()