test_leak.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from __future__ import print_function
  2. import gc
  3. import os
  4. import sys
  5. import shlex
  6. import subprocess
  7. sys.path.insert(0, os.getcwd())
  8. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  9. from celery import current_app
  10. from celery.five import range
  11. from celery.tests.case import SkipTest, unittest
  12. import suite # noqa
  13. GET_RSIZE = '/bin/ps -p {pid} -o rss='
  14. class Sizes(list):
  15. def add(self, item):
  16. if item not in self:
  17. self.append(item)
  18. def average(self):
  19. return sum(self) / len(self)
  20. class LeakFunCase(unittest.TestCase):
  21. def setUp(self):
  22. self.app = current_app
  23. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  24. def get_rsize(self, cmd=GET_RSIZE):
  25. try:
  26. return int(subprocess.Popen(
  27. shlex.split(cmd.format(pid=os.getpid())),
  28. stdout=subprocess.PIPE).communicate()[0].strip()
  29. )
  30. except OSError as exc:
  31. raise SkipTest(
  32. 'Cannot execute command: {0!r}: {1!r}'.format(cmd, exc))
  33. def sample_allocated(self, fun, *args, **kwargs):
  34. before = self.get_rsize()
  35. fun(*args, **kwargs)
  36. gc.collect()
  37. after = self.get_rsize()
  38. return before, after
  39. def appx(self, s, r=1):
  40. """r==1 (10e1): Keep up to hundred kB,
  41. e.g. 16,268MB becomes 16,2MB."""
  42. return int(s / 10.0 ** (r + 1)) / 10.0
  43. def assertFreed(self, n, fun, *args, **kwargs):
  44. # call function first to load lazy modules etc.
  45. fun(*args, **kwargs)
  46. try:
  47. base = self.get_rsize()
  48. first = None
  49. sizes = Sizes()
  50. for i in range(n):
  51. before, after = self.sample_allocated(fun, *args, **kwargs)
  52. if not first:
  53. first = after
  54. if self.debug:
  55. print('{0!r} {1}: before/after: {2}/{3}'.format(
  56. fun, i, before, after))
  57. else:
  58. sys.stderr.write('.')
  59. sizes.add(self.appx(after))
  60. self.assertEqual(gc.collect(), 0)
  61. self.assertEqual(gc.garbage, [])
  62. try:
  63. assert self.appx(first) >= self.appx(after)
  64. except AssertionError:
  65. print('BASE: {0!r} AVG: {1!r} SIZES: {2!r}'.format(
  66. base, sizes.average(), sizes))
  67. raise
  68. finally:
  69. self.app.control.purge()
  70. class test_leaks(LeakFunCase):
  71. def test_task_apply_leak(self, its=1000):
  72. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  73. @self.app.task
  74. def task1():
  75. pass
  76. try:
  77. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  78. except AttributeError:
  79. return self.assertFreed(self.iterations, task1.delay)
  80. self.app.conf.BROKER_POOL_LIMIT = None
  81. try:
  82. self.app._pool = None
  83. self.assertFreed(its, task1.delay)
  84. finally:
  85. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  86. def test_task_apply_leak_with_pool(self, its=1000):
  87. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  88. @self.app.task
  89. def task2():
  90. pass
  91. try:
  92. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  93. except AttributeError:
  94. raise SkipTest('This version does not support autopool')
  95. self.app.conf.BROKER_POOL_LIMIT = 10
  96. try:
  97. self.app._pool = None
  98. self.assertFreed(its, task2.delay)
  99. finally:
  100. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  101. if __name__ == '__main__':
  102. unittest.main()