test_leak.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import gc
  3. import os
  4. import sys
  5. import shlex
  6. import subprocess
  7. from celery import current_app
  8. from celery.tests.case import SkipTest, unittest
  9. import suite # noqa
  10. GET_RSIZE = b'/bin/ps -p {pid} -o rss='
  11. class Sizes(list):
  12. def add(self, item):
  13. if item not in self:
  14. self.append(item)
  15. def average(self):
  16. return sum(self) / len(self)
  17. class LeakFunCase(unittest.TestCase):
  18. def setUp(self):
  19. self.app = current_app
  20. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  21. def get_rsize(self, cmd=GET_RSIZE):
  22. try:
  23. return int(subprocess.Popen(
  24. shlex.split(cmd.format(pid=os.getpid())),
  25. stdout=subprocess.PIPE).communicate()[0].strip()
  26. )
  27. except OSError as exc:
  28. raise SkipTest(
  29. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  30. def sample_allocated(self, fun, *args, **kwargs):
  31. before = self.get_rsize()
  32. fun(*args, **kwargs)
  33. gc.collect()
  34. after = self.get_rsize()
  35. return before, after
  36. def appx(self, s, r=1):
  37. """r==1 (10e1): Keep up to hundred kB,
  38. e.g. 16,268MB becomes 16,2MB."""
  39. return int(s / 10.0 ** (r + 1)) / 10.0
  40. def assertFreed(self, n, fun, *args, **kwargs):
  41. # call function first to load lazy modules etc.
  42. fun(*args, **kwargs)
  43. try:
  44. base = self.get_rsize()
  45. first = None
  46. sizes = Sizes()
  47. for i in range(n):
  48. before, after = self.sample_allocated(fun, *args, **kwargs)
  49. if not first:
  50. first = after
  51. if self.debug:
  52. print('{0!r} {1}: before/after: {2}/{3}'.format(
  53. fun, i, before, after))
  54. else:
  55. sys.stderr.write('.')
  56. sizes.add(self.appx(after))
  57. self.assertEqual(gc.collect(), 0)
  58. self.assertEqual(gc.garbage, [])
  59. try:
  60. assert self.appx(first) >= self.appx(after)
  61. except AssertionError:
  62. print('base: {0!r} avg: {1!r} sizes: {2!r}'.format(
  63. base, sizes.average(), sizes))
  64. raise
  65. finally:
  66. self.app.control.purge()
  67. class test_leaks(LeakFunCase):
  68. def test_task_apply_leak(self, its=1000):
  69. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  70. @self.app.task
  71. def task1():
  72. pass
  73. try:
  74. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  75. except AttributeError:
  76. return self.assertFreed(self.iterations, task1.delay)
  77. self.app.conf.BROKER_POOL_LIMIT = None
  78. try:
  79. self.app._pool = None
  80. self.assertFreed(its, task1.delay)
  81. finally:
  82. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  83. def test_task_apply_leak_with_pool(self, its=1000):
  84. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  85. @self.app.task
  86. def task2():
  87. pass
  88. try:
  89. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  90. except AttributeError:
  91. raise SkipTest('This version does not support autopool')
  92. self.app.conf.BROKER_POOL_LIMIT = 10
  93. try:
  94. self.app._pool = None
  95. self.assertFreed(its, task2.delay)
  96. finally:
  97. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  98. if __name__ == '__main__':
  99. unittest.main()