test_leak.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import gc
  3. import os
  4. import sys
  5. import shlex
  6. import subprocess
  7. import unittest
  8. from case import Case
  9. from case.skip import SkipTest
  10. from celery import current_app
  11. from celery.five import range
  12. import suite # noqa
  13. GET_RSIZE = b'/bin/ps -p {pid} -o rss='
  14. class Sizes(list):
  15. def add(self, item):
  16. if item not in self:
  17. self.append(item)
  18. def average(self):
  19. return sum(self) / len(self)
  20. class LeakFunCase(Case):
  21. def setUp(self):
  22. self.app = current_app
  23. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  24. def get_rsize(self, cmd=GET_RSIZE):
  25. try:
  26. return int(subprocess.Popen(
  27. shlex.split(cmd.format(pid=os.getpid())),
  28. stdout=subprocess.PIPE).communicate()[0].strip()
  29. )
  30. except OSError as exc:
  31. raise SkipTest(
  32. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  33. def sample_allocated(self, fun, *args, **kwargs):
  34. before = self.get_rsize()
  35. fun(*args, **kwargs)
  36. gc.collect()
  37. after = self.get_rsize()
  38. return before, after
  39. def appx(self, s, r=1):
  40. """r==1 (10e1): Keep up to hundred kB (e.g., 16,268MB
  41. becomes 16,2MB)."""
  42. return int(s / 10.0 ** (r + 1)) / 10.0
  43. def assertFreed(self, n, fun, *args, **kwargs):
  44. # call function first to load lazy modules etc.
  45. fun(*args, **kwargs)
  46. try:
  47. base = self.get_rsize()
  48. first = None
  49. sizes = Sizes()
  50. for i in range(n):
  51. before, after = self.sample_allocated(fun, *args, **kwargs)
  52. if not first:
  53. first = after
  54. if self.debug:
  55. print('{0!r} {1}: before/after: {2}/{3}'.format(
  56. fun, i, before, after))
  57. else:
  58. sys.stderr.write('.')
  59. sizes.add(self.appx(after))
  60. self.assertEqual(gc.collect(), 0)
  61. self.assertEqual(gc.garbage, [])
  62. try:
  63. assert self.appx(first) >= self.appx(after)
  64. except AssertionError:
  65. print('base: {0!r} avg: {1!r} sizes: {2!r}'.format(
  66. base, sizes.average(), sizes))
  67. raise
  68. finally:
  69. self.app.control.purge()
  70. class test_leaks(LeakFunCase):
  71. def test_task_apply_leak(self, its=1000):
  72. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  73. @self.app.task
  74. def task1():
  75. pass
  76. try:
  77. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  78. except AttributeError:
  79. return self.assertFreed(self.iterations, task1.delay)
  80. self.app.conf.BROKER_POOL_LIMIT = None
  81. try:
  82. self.app._pool = None
  83. self.assertFreed(its, task1.delay)
  84. finally:
  85. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  86. def test_task_apply_leak_with_pool(self, its=1000):
  87. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  88. @self.app.task
  89. def task2():
  90. pass
  91. try:
  92. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  93. except AttributeError:
  94. raise SkipTest('This version does not support autopool')
  95. self.app.conf.BROKER_POOL_LIMIT = 10
  96. try:
  97. self.app._pool = None
  98. self.assertFreed(its, task2.delay)
  99. finally:
  100. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  101. if __name__ == '__main__':
  102. unittest.main()