test_autoscale.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. from __future__ import absolute_import
  2. import sys
  3. from time import time
  4. from mock import Mock, patch
  5. from celery.concurrency.base import BasePool
  6. from celery.worker import state
  7. from celery.worker import autoscale
  8. from celery.tests.utils import Case, sleepdeprived
  9. class Object(object):
  10. pass
  11. class MockPool(BasePool):
  12. shrink_raises_exception = False
  13. shrink_raises_ValueError = False
  14. def __init__(self, *args, **kwargs):
  15. super(MockPool, self).__init__(*args, **kwargs)
  16. self._pool = Object()
  17. self._pool._processes = self.limit
  18. def grow(self, n=1):
  19. self._pool._processes += n
  20. def shrink(self, n=1):
  21. if self.shrink_raises_exception:
  22. raise KeyError("foo")
  23. if self.shrink_raises_ValueError:
  24. raise ValueError("foo")
  25. self._pool._processes -= n
  26. @property
  27. def num_processes(self):
  28. return self._pool._processes
  29. class test_Autoscaler(Case):
  30. def setUp(self):
  31. self.pool = MockPool(3)
  32. def test_stop(self):
  33. class Scaler(autoscale.Autoscaler):
  34. alive = True
  35. joined = False
  36. def is_alive(self):
  37. return self.alive
  38. def join(self, timeout=None):
  39. self.joined = True
  40. x = Scaler(self.pool, 10, 3)
  41. x._is_stopped.set()
  42. x.stop()
  43. self.assertTrue(x.joined)
  44. x.joined = False
  45. x.alive = False
  46. x.stop()
  47. self.assertFalse(x.joined)
  48. @sleepdeprived(autoscale)
  49. def test_body(self):
  50. x = autoscale.Autoscaler(self.pool, 10, 3)
  51. x.body()
  52. self.assertEqual(x.pool.num_processes, 3)
  53. for i in range(20):
  54. state.reserved_requests.add(i)
  55. x.body()
  56. x.body()
  57. self.assertEqual(x.pool.num_processes, 10)
  58. state.reserved_requests.clear()
  59. x.body()
  60. self.assertEqual(x.pool.num_processes, 10)
  61. x._last_action = time() - 10000
  62. x.body()
  63. self.assertEqual(x.pool.num_processes, 3)
  64. def test_run(self):
  65. class Scaler(autoscale.Autoscaler):
  66. scale_called = False
  67. def body(self):
  68. self.scale_called = True
  69. self._is_shutdown.set()
  70. x = Scaler(self.pool, 10, 3)
  71. x.run()
  72. self.assertTrue(x._is_shutdown.isSet())
  73. self.assertTrue(x._is_stopped.isSet())
  74. self.assertTrue(x.scale_called)
  75. def test_shrink_raises_exception(self):
  76. x = autoscale.Autoscaler(self.pool, 10, 3)
  77. x.scale_up(3)
  78. x._last_action = time() - 10000
  79. x.pool.shrink_raises_exception = True
  80. x.scale_down(1)
  81. @patch("celery.worker.autoscale.debug")
  82. def test_shrink_raises_ValueError(self, debug):
  83. x = autoscale.Autoscaler(self.pool, 10, 3)
  84. x.scale_up(3)
  85. x._last_action = time() - 10000
  86. x.pool.shrink_raises_ValueError = True
  87. x.scale_down(1)
  88. self.assertTrue(debug.call_count)
  89. def test_update_and_force(self):
  90. x = autoscale.Autoscaler(self.pool, 10, 3)
  91. self.assertEqual(x.processes, 3)
  92. x.force_scale_up(5)
  93. self.assertEqual(x.processes, 8)
  94. x.update(5, None)
  95. self.assertEqual(x.processes, 5)
  96. x.force_scale_down(3)
  97. self.assertEqual(x.processes, 2)
  98. x.update(3, None)
  99. self.assertEqual(x.processes, 3)
  100. x.force_scale_down(1000)
  101. self.assertEqual(x.min_concurrency, 0)
  102. self.assertEqual(x.processes, 0)
  103. x.force_scale_up(1000)
  104. x.min_concurrency = 1
  105. x.force_scale_down(1)
  106. x.update(max=300, min=10)
  107. x.update(max=300, min=2)
  108. x.update(max=None, min=None)
  109. def test_info(self):
  110. x = autoscale.Autoscaler(self.pool, 10, 3)
  111. info = x.info()
  112. self.assertEqual(info['max'], 10)
  113. self.assertEqual(info['min'], 3)
  114. self.assertEqual(info['current'], 3)
  115. @patch("os._exit")
  116. def test_thread_crash(self, _exit):
  117. class _Autoscaler(autoscale.Autoscaler):
  118. def body(self):
  119. self._is_shutdown.set()
  120. raise OSError("foo")
  121. x = _Autoscaler(self.pool, 10, 3)
  122. stderr = Mock()
  123. p, sys.stderr = sys.stderr, stderr
  124. try:
  125. x.run()
  126. finally:
  127. sys.stderr = p
  128. _exit.assert_called_with(1)
  129. self.assertTrue(stderr.write.call_count)