test_leak.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from __future__ import print_function
  2. import gc
  3. import os
  4. import sys
  5. import shlex
  6. import subprocess
  7. sys.path.insert(0, os.getcwd())
  8. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  9. from nose import SkipTest
  10. from celery import current_app
  11. from celery.five import range
  12. from celery.tests.utils import unittest
  13. import suite
  14. GET_RSIZE = '/bin/ps -p {pid} -o rss='
  15. QUICKTEST = int(os.environ.get('QUICKTEST', 0))
  16. class Sizes(list):
  17. def add(self, item):
  18. if item not in self:
  19. self.append(item)
  20. def average(self):
  21. return sum(self) / len(self)
  22. class LeakFunCase(unittest.TestCase):
  23. def setUp(self):
  24. self.app = current_app
  25. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  26. def get_rsize(self, cmd=GET_RSIZE):
  27. try:
  28. return int(subprocess.Popen(
  29. shlex.split(cmd.format(pid=os.getpid())),
  30. stdout=subprocess.PIPE).communicate()[0].strip())
  31. except OSError as exc:
  32. raise SkipTest(
  33. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  34. def sample_allocated(self, fun, *args, **kwargs):
  35. before = self.get_rsize()
  36. fun(*args, **kwargs)
  37. gc.collect()
  38. after = self.get_rsize()
  39. return before, after
  40. def appx(self, s, r=1):
  41. """r==1 (10e1): Keep up to hundred kB,
  42. e.g. 16,268MB becomes 16,2MB."""
  43. return int(s / 10.0 ** (r + 1)) / 10.0
  44. def assertFreed(self, n, fun, *args, **kwargs):
  45. # call function first to load lazy modules etc.
  46. fun(*args, **kwargs)
  47. try:
  48. base = self.get_rsize()
  49. first = None
  50. sizes = Sizes()
  51. for i in range(n):
  52. before, after = self.sample_allocated(fun, *args, **kwargs)
  53. if not first:
  54. first = after
  55. if self.debug:
  56. print('{0!r} {1}: before/after: {2}/{3}'.format(
  57. fun, i, before, after))
  58. else:
  59. sys.stderr.write('.')
  60. sizes.add(self.appx(after))
  61. self.assertEqual(gc.collect(), 0)
  62. self.assertEqual(gc.garbage, [])
  63. try:
  64. assert self.appx(first) >= self.appx(after)
  65. except AssertionError:
  66. print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(
  67. base, sizes.average(), sizes))
  68. raise
  69. finally:
  70. self.app.control.purge()
  71. class test_leaks(LeakFunCase):
  72. def test_task_apply_leak(self):
  73. its = QUICKTEST and 10 or 1000
  74. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  75. @self.app.task
  76. def task1():
  77. pass
  78. try:
  79. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  80. except AttributeError:
  81. return self.assertFreed(self.iterations, foo.delay)
  82. self.app.conf.BROKER_POOL_LIMIT = None
  83. try:
  84. self.app._pool = None
  85. self.assertFreed(its, task1.delay)
  86. finally:
  87. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  88. def test_task_apply_leak_with_pool(self):
  89. its = QUICKTEST and 10 or 1000
  90. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  91. @self.app.task
  92. def task2():
  93. pass
  94. try:
  95. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  96. except AttributeError:
  97. raise SkipTest('This version does not support autopool')
  98. self.app.conf.BROKER_POOL_LIMIT = 10
  99. try:
  100. self.app._pool = None
  101. self.assertFreed(its, task2.delay)
  102. finally:
  103. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  104. if __name__ == '__main__':
  105. unittest.main()