test_autoscale.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. from case import Mock, mock, patch
  4. from celery.concurrency.base import BasePool
  5. from celery.five import monotonic
  6. from celery.worker import state
  7. from celery.worker import autoscale
  8. from celery.utils.objects import Bunch
  9. class MockPool(BasePool):
  10. shrink_raises_exception = False
  11. shrink_raises_ValueError = False
  12. def __init__(self, *args, **kwargs):
  13. super(MockPool, self).__init__(*args, **kwargs)
  14. self._pool = Bunch(_processes=self.limit)
  15. def grow(self, n=1):
  16. self._pool._processes += n
  17. def shrink(self, n=1):
  18. if self.shrink_raises_exception:
  19. raise KeyError('foo')
  20. if self.shrink_raises_ValueError:
  21. raise ValueError('foo')
  22. self._pool._processes -= n
  23. @property
  24. def num_processes(self):
  25. return self._pool._processes
  26. class test_WorkerComponent:
  27. def test_register_with_event_loop(self):
  28. parent = Mock(name='parent')
  29. parent.autoscale = True
  30. parent.consumer.on_task_message = set()
  31. w = autoscale.WorkerComponent(parent)
  32. assert parent.autoscaler is None
  33. assert w.enabled
  34. hub = Mock(name='hub')
  35. w.create(parent)
  36. w.register_with_event_loop(parent, hub)
  37. assert (parent.autoscaler.maybe_scale in
  38. parent.consumer.on_task_message)
  39. hub.call_repeatedly.assert_called_with(
  40. parent.autoscaler.keepalive, parent.autoscaler.maybe_scale,
  41. )
  42. parent.hub = hub
  43. hub.on_init = []
  44. w.instantiate = Mock()
  45. w.register_with_event_loop(parent, Mock(name='loop'))
  46. assert parent.consumer.on_task_message
  47. class test_Autoscaler:
  48. def setup(self):
  49. self.pool = MockPool(3)
  50. def test_stop(self):
  51. class Scaler(autoscale.Autoscaler):
  52. alive = True
  53. joined = False
  54. def is_alive(self):
  55. return self.alive
  56. def join(self, timeout=None):
  57. self.joined = True
  58. worker = Mock(name='worker')
  59. x = Scaler(self.pool, 10, 3, worker=worker)
  60. x._is_stopped.set()
  61. x.stop()
  62. assert x.joined
  63. x.joined = False
  64. x.alive = False
  65. x.stop()
  66. assert not x.joined
  67. @mock.sleepdeprived(module=autoscale)
  68. def test_body(self):
  69. worker = Mock(name='worker')
  70. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  71. x.body()
  72. assert x.pool.num_processes == 3
  73. _keep = [Mock(name='req{0}'.format(i)) for i in range(20)]
  74. [state.task_reserved(m) for m in _keep]
  75. x.body()
  76. x.body()
  77. assert x.pool.num_processes == 10
  78. worker.consumer._update_prefetch_count.assert_called()
  79. state.reserved_requests.clear()
  80. x.body()
  81. assert x.pool.num_processes == 10
  82. x._last_scale_up = monotonic() - 10000
  83. x.body()
  84. assert x.pool.num_processes == 3
  85. worker.consumer._update_prefetch_count.assert_called()
  86. def test_run(self):
  87. class Scaler(autoscale.Autoscaler):
  88. scale_called = False
  89. def body(self):
  90. self.scale_called = True
  91. self._is_shutdown.set()
  92. worker = Mock(name='worker')
  93. x = Scaler(self.pool, 10, 3, worker=worker)
  94. x.run()
  95. assert x._is_shutdown.isSet()
  96. assert x._is_stopped.isSet()
  97. assert x.scale_called
  98. def test_shrink_raises_exception(self):
  99. worker = Mock(name='worker')
  100. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  101. x.scale_up(3)
  102. x.pool.shrink_raises_exception = True
  103. x._shrink(1)
  104. @patch('celery.worker.autoscale.debug')
  105. def test_shrink_raises_ValueError(self, debug):
  106. worker = Mock(name='worker')
  107. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  108. x.scale_up(3)
  109. x._last_scale_up = monotonic() - 10000
  110. x.pool.shrink_raises_ValueError = True
  111. x.scale_down(1)
  112. assert debug.call_count
  113. def test_update_and_force(self):
  114. worker = Mock(name='worker')
  115. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  116. assert x.processes == 3
  117. x.force_scale_up(5)
  118. assert x.processes == 8
  119. x.update(5, None)
  120. assert x.processes == 5
  121. x.force_scale_down(3)
  122. assert x.processes == 2
  123. x.update(None, 3)
  124. assert x.processes == 3
  125. x.force_scale_down(1000)
  126. assert x.min_concurrency == 0
  127. assert x.processes == 0
  128. x.force_scale_up(1000)
  129. x.min_concurrency = 1
  130. x.force_scale_down(1)
  131. x.update(max=300, min=10)
  132. x.update(max=300, min=2)
  133. x.update(max=None, min=None)
  134. def test_info(self):
  135. worker = Mock(name='worker')
  136. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  137. info = x.info()
  138. assert info['max'] == 10
  139. assert info['min'] == 3
  140. assert info['current'] == 3
  141. @patch('os._exit')
  142. def test_thread_crash(self, _exit):
  143. class _Autoscaler(autoscale.Autoscaler):
  144. def body(self):
  145. self._is_shutdown.set()
  146. raise OSError('foo')
  147. worker = Mock(name='worker')
  148. x = _Autoscaler(self.pool, 10, 3, worker=worker)
  149. stderr = Mock()
  150. p, sys.stderr = sys.stderr, stderr
  151. try:
  152. x.run()
  153. finally:
  154. sys.stderr = p
  155. _exit.assert_called_with(1)
  156. stderr.write.assert_called()
  157. @mock.sleepdeprived(module=autoscale)
  158. def test_no_negative_scale(self):
  159. total_num_processes = []
  160. worker = Mock(name='worker')
  161. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  162. x.body() # the body func scales up or down
  163. _keep = [Mock(name='req{0}'.format(i)) for i in range(35)]
  164. for req in _keep:
  165. state.task_reserved(req)
  166. x.body()
  167. total_num_processes.append(self.pool.num_processes)
  168. for req in _keep:
  169. state.task_ready(req)
  170. x.body()
  171. total_num_processes.append(self.pool.num_processes)
  172. assert all(x.min_concurrency <= i <= x.max_concurrency
  173. for i in total_num_processes)