test_leak.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import gc
  2. import os
  3. import sys
  4. import shlex
  5. import subprocess
  6. sys.path.insert(0, os.getcwd())
  7. sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
  8. from nose import SkipTest
  9. from celery import current_app
  10. from celery.tests.utils import unittest
  11. import suite
  12. GET_RSIZE = '/bin/ps -p %(pid)s -o rss='
  13. QUICKTEST = int(os.environ.get('QUICKTEST', 0))
  14. class Sizes(list):
  15. def add(self, item):
  16. if item not in self:
  17. self.append(item)
  18. def average(self):
  19. return sum(self) / len(self)
  20. class LeakFunCase(unittest.TestCase):
  21. def setUp(self):
  22. self.app = current_app
  23. self.debug = os.environ.get('TEST_LEAK_DEBUG', False)
  24. def get_rsize(self, cmd=GET_RSIZE):
  25. try:
  26. return int(subprocess.Popen(
  27. shlex.split(cmd % {'pid': os.getpid()}),
  28. stdout=subprocess.PIPE).communicate()[0].strip())
  29. except OSError, exc:
  30. raise SkipTest("Can't execute command: %r: %r" % (cmd, exc))
  31. def sample_allocated(self, fun, *args, **kwargs):
  32. before = self.get_rsize()
  33. fun(*args, **kwargs)
  34. gc.collect()
  35. after = self.get_rsize()
  36. return before, after
  37. def appx(self, s, r=1):
  38. """r==1 (10e1): Keep up to hundred kB,
  39. e.g. 16,268MB becomes 16,2MB."""
  40. return int(s / 10.0 ** (r + 1)) / 10.0
  41. def assertFreed(self, n, fun, *args, **kwargs):
  42. # call function first to load lazy modules etc.
  43. fun(*args, **kwargs)
  44. try:
  45. base = self.get_rsize()
  46. first = None
  47. sizes = Sizes()
  48. for i in xrange(n):
  49. before, after = self.sample_allocated(fun, *args, **kwargs)
  50. if not first:
  51. first = after
  52. if self.debug:
  53. print('%r %s: before/after: %s/%s' % (
  54. fun, i, before, after))
  55. else:
  56. sys.stderr.write('.')
  57. sizes.add(self.appx(after))
  58. self.assertEqual(gc.collect(), 0)
  59. self.assertEqual(gc.garbage, [])
  60. try:
  61. assert self.appx(first) >= self.appx(after)
  62. except AssertionError:
  63. print('BASE: %r AVG: %r SIZES: %r' % (
  64. base, sizes.average(), sizes, ))
  65. raise
  66. finally:
  67. self.app.control.purge()
  68. class test_leaks(LeakFunCase):
  69. def test_task_apply_leak(self):
  70. its = QUICKTEST and 10 or 1000
  71. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  72. @self.app.task
  73. def task1():
  74. pass
  75. try:
  76. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  77. except AttributeError:
  78. return self.assertFreed(self.iterations, foo.delay)
  79. self.app.conf.BROKER_POOL_LIMIT = None
  80. try:
  81. self.app._pool = None
  82. self.assertFreed(its, task1.delay)
  83. finally:
  84. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  85. def test_task_apply_leak_with_pool(self):
  86. its = QUICKTEST and 10 or 1000
  87. self.assertNotEqual(self.app.conf.BROKER_TRANSPORT, 'memory')
  88. @self.app.task
  89. def task2():
  90. pass
  91. try:
  92. pool_limit = self.app.conf.BROKER_POOL_LIMIT
  93. except AttributeError:
  94. raise SkipTest('This version does not support autopool')
  95. self.app.conf.BROKER_POOL_LIMIT = 10
  96. try:
  97. self.app._pool = None
  98. self.assertFreed(its, task2.delay)
  99. finally:
  100. self.app.conf.BROKER_POOL_LIMIT = pool_limit
  101. if __name__ == '__main__':
  102. unittest.main()