test_leak.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. from __future__ import print_function
  2. import gc
  3. import os
  4. import sys
  5. import shlex
  6. import subprocess
  7. sys.path.insert(0, os.getcwd())
  8. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  9. from nose import SkipTest
  10. from celery import current_app
  11. from celery.five import range
  12. from celery.tests.utils import unittest
  13. import suite # noqa
  14. GET_RSIZE = '/bin/ps -p {pid} -o rss='
  15. QUICKTEST = int(os.environ.get('QUICKTEST', 0))
  16. class Sizes(list):
  17. def add(self, item):
  18. if item not in self:
  19. self.append(item)
  20. def average(self):
  21. return sum(self) / len(self)
  22. class LeakFunCase(unittest.TestCase):
  23. def setUp(self):
  24. self.app = current_app
  25. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  26. def get_rsize(self, cmd=GET_RSIZE):
  27. try:
  28. return int(subprocess.Popen(
  29. shlex.split(cmd.format(pid=os.getpid())),
  30. stdout=subprocess.PIPE).communicate()[0].strip()
  31. )
  32. except OSError as exc:
  33. raise SkipTest(
  34. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  35. def sample_allocated(self, fun, *args, **kwargs):
  36. before = self.get_rsize()
  37. fun(*args, **kwargs)
  38. gc.collect()
  39. after = self.get_rsize()
  40. return before, after
  41. def appx(self, s, r=1):
  42. """r==1 (10e1): Keep up to hundred kB,
  43. e.g. 16,268MB becomes 16,2MB."""
  44. return int(s / 10.0 ** (r + 1)) / 10.0
  45. def assertFreed(self, n, fun, *args, **kwargs):
  46. # call function first to load lazy modules etc.
  47. fun(*args, **kwargs)
  48. try:
  49. base = self.get_rsize()
  50. first = None
  51. sizes = Sizes()
  52. for i in range(n):
  53. before, after = self.sample_allocated(fun, *args, **kwargs)
  54. if not first:
  55. first = after
  56. if self.debug:
  57. print('{0!r} {1}: before/after: {2}/{3}'.format(
  58. fun, i, before, after))
  59. else:
  60. sys.stderr.write('.')
  61. sizes.add(self.appx(after))
  62. self.assertEqual(gc.collect(), 0)
  63. self.assertEqual(gc.garbage, [])
  64. try:
  65. assert self.appx(first) >= self.appx(after)
  66. except AssertionError:
  67. print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(
  68. base, sizes.average(), sizes))
  69. raise
  70. finally:
  71. self.app.control.purge()
  72. class test_leaks(LeakFunCase):
  73. def test_task_apply_leak(self):
  74. its = QUICKTEST and 10 or 1000
  75. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  76. @self.app.task
  77. def task1():
  78. pass
  79. try:
  80. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  81. except AttributeError:
  82. return self.assertFreed(self.iterations, task1.delay)
  83. self.app.conf.BROKER_POOL_LIMIT = None
  84. try:
  85. self.app._pool = None
  86. self.assertFreed(its, task1.delay)
  87. finally:
  88. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  89. def test_task_apply_leak_with_pool(self):
  90. its = QUICKTEST and 10 or 1000
  91. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  92. @self.app.task
  93. def task2():
  94. pass
  95. try:
  96. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  97. except AttributeError:
  98. raise SkipTest('This version does not support autopool')
  99. self.app.conf.BROKER_POOL_LIMIT = 10
  100. try:
  101. self.app._pool = None
  102. self.assertFreed(its, task2.delay)
  103. finally:
  104. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  105. if __name__ == '__main__':
  106. unittest.main()