test_leak.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import gc
  2. import os
  3. import sys
  4. import shlex
  5. import subprocess
  6. from celery import current_app
  7. from celery.tests.case import SkipTest, unittest
  8. import suite # noqa
  9. GET_RSIZE = b'/bin/ps -p {pid} -o rss='
  10. class Sizes(list):
  11. def add(self, item):
  12. if item not in self:
  13. self.append(item)
  14. def average(self):
  15. return sum(self) / len(self)
  16. class LeakFunCase(unittest.TestCase):
  17. def setUp(self):
  18. self.app = current_app
  19. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  20. def get_rsize(self, cmd=GET_RSIZE):
  21. try:
  22. return int(subprocess.Popen(
  23. shlex.split(cmd.format(pid=os.getpid())),
  24. stdout=subprocess.PIPE).communicate()[0].strip()
  25. )
  26. except OSError as exc:
  27. raise SkipTest(
  28. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  29. def sample_allocated(self, fun, *args, **kwargs):
  30. before = self.get_rsize()
  31. fun(*args, **kwargs)
  32. gc.collect()
  33. after = self.get_rsize()
  34. return before, after
  35. def appx(self, s, r=1):
  36. """r==1 (10e1): Keep up to hundred kB,
  37. e.g. 16,268MB becomes 16,2MB."""
  38. return int(s / 10.0 ** (r + 1)) / 10.0
  39. def assertFreed(self, n, fun, *args, **kwargs):
  40. # call function first to load lazy modules etc.
  41. fun(*args, **kwargs)
  42. try:
  43. base = self.get_rsize()
  44. first = None
  45. sizes = Sizes()
  46. for i in range(n):
  47. before, after = self.sample_allocated(fun, *args, **kwargs)
  48. if not first:
  49. first = after
  50. if self.debug:
  51. print('{0!r} {1}: before/after: {2}/{3}'.format(
  52. fun, i, before, after))
  53. else:
  54. sys.stderr.write('.')
  55. sizes.add(self.appx(after))
  56. self.assertEqual(gc.collect(), 0)
  57. self.assertEqual(gc.garbage, [])
  58. try:
  59. assert self.appx(first) >= self.appx(after)
  60. except AssertionError:
  61. print('base: {0!r} avg: {1!r} sizes: {2!r}'.format(
  62. base, sizes.average(), sizes))
  63. raise
  64. finally:
  65. self.app.control.purge()
  66. class test_leaks(LeakFunCase):
  67. def test_task_apply_leak(self, its=1000):
  68. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  69. @self.app.task
  70. def task1():
  71. pass
  72. try:
  73. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  74. except AttributeError:
  75. return self.assertFreed(self.iterations, task1.delay)
  76. self.app.conf.BROKER_POOL_LIMIT = None
  77. try:
  78. self.app._pool = None
  79. self.assertFreed(its, task1.delay)
  80. finally:
  81. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  82. def test_task_apply_leak_with_pool(self, its=1000):
  83. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  84. @self.app.task
  85. def task2():
  86. pass
  87. try:
  88. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  89. except AttributeError:
  90. raise SkipTest('This version does not support autopool')
  91. self.app.conf.BROKER_POOL_LIMIT = 10
  92. try:
  93. self.app._pool = None
  94. self.assertFreed(its, task2.delay)
  95. finally:
  96. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  97. if __name__ == '__main__':
  98. unittest.main()