test_autoscale.py 6.6 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. from case import Mock, mock, patch
  4. from celery.concurrency.base import BasePool
  5. from celery.five import monotonic
  6. from celery.utils.objects import Bunch
  7. from celery.worker import autoscale, state
  8. class MockPool(BasePool):
  9. shrink_raises_exception = False
  10. shrink_raises_ValueError = False
  11. def __init__(self, *args, **kwargs):
  12. super(MockPool, self).__init__(*args, **kwargs)
  13. self._pool = Bunch(_processes=self.limit)
  14. def grow(self, n=1):
  15. self._pool._processes += n
  16. def shrink(self, n=1):
  17. if self.shrink_raises_exception:
  18. raise KeyError('foo')
  19. if self.shrink_raises_ValueError:
  20. raise ValueError('foo')
  21. self._pool._processes -= n
  22. @property
  23. def num_processes(self):
  24. return self._pool._processes
  25. class test_WorkerComponent:
  26. def test_register_with_event_loop(self):
  27. parent = Mock(name='parent')
  28. parent.autoscale = True
  29. parent.consumer.on_task_message = set()
  30. w = autoscale.WorkerComponent(parent)
  31. assert parent.autoscaler is None
  32. assert w.enabled
  33. hub = Mock(name='hub')
  34. w.create(parent)
  35. w.register_with_event_loop(parent, hub)
  36. assert (parent.autoscaler.maybe_scale in
  37. parent.consumer.on_task_message)
  38. hub.call_repeatedly.assert_called_with(
  39. parent.autoscaler.keepalive, parent.autoscaler.maybe_scale,
  40. )
  41. parent.hub = hub
  42. hub.on_init = []
  43. w.instantiate = Mock()
  44. w.register_with_event_loop(parent, Mock(name='loop'))
  45. assert parent.consumer.on_task_message
  46. def test_info_without_event_loop(self):
  47. parent = Mock(name='parent')
  48. parent.autoscale = True
  49. parent.max_concurrency = '10'
  50. parent.min_concurrency = '2'
  51. parent.use_eventloop = False
  52. w = autoscale.WorkerComponent(parent)
  53. w.create(parent)
  54. info = w.info(parent)
  55. assert 'autoscaler' in info
  56. assert parent.autoscaler_cls().info.called
  57. class test_Autoscaler:
  58. def setup(self):
  59. self.pool = MockPool(3)
  60. def test_stop(self):
  61. class Scaler(autoscale.Autoscaler):
  62. alive = True
  63. joined = False
  64. def is_alive(self):
  65. return self.alive
  66. def join(self, timeout=None):
  67. self.joined = True
  68. worker = Mock(name='worker')
  69. x = Scaler(self.pool, 10, 3, worker=worker)
  70. x._is_stopped.set()
  71. x.stop()
  72. assert x.joined
  73. x.joined = False
  74. x.alive = False
  75. x.stop()
  76. assert not x.joined
  77. @mock.sleepdeprived(module=autoscale)
  78. def test_body(self):
  79. worker = Mock(name='worker')
  80. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  81. x.body()
  82. assert x.pool.num_processes == 3
  83. _keep = [Mock(name='req{0}'.format(i)) for i in range(20)]
  84. [state.task_reserved(m) for m in _keep]
  85. x.body()
  86. x.body()
  87. assert x.pool.num_processes == 10
  88. worker.consumer._update_prefetch_count.assert_called()
  89. state.reserved_requests.clear()
  90. x.body()
  91. assert x.pool.num_processes == 10
  92. x._last_scale_up = monotonic() - 10000
  93. x.body()
  94. assert x.pool.num_processes == 3
  95. worker.consumer._update_prefetch_count.assert_called()
  96. def test_run(self):
  97. class Scaler(autoscale.Autoscaler):
  98. scale_called = False
  99. def body(self):
  100. self.scale_called = True
  101. self._is_shutdown.set()
  102. worker = Mock(name='worker')
  103. x = Scaler(self.pool, 10, 3, worker=worker)
  104. x.run()
  105. assert x._is_shutdown.isSet()
  106. assert x._is_stopped.isSet()
  107. assert x.scale_called
  108. def test_shrink_raises_exception(self):
  109. worker = Mock(name='worker')
  110. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  111. x.scale_up(3)
  112. x.pool.shrink_raises_exception = True
  113. x._shrink(1)
  114. @patch('celery.worker.autoscale.debug')
  115. def test_shrink_raises_ValueError(self, debug):
  116. worker = Mock(name='worker')
  117. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  118. x.scale_up(3)
  119. x._last_scale_up = monotonic() - 10000
  120. x.pool.shrink_raises_ValueError = True
  121. x.scale_down(1)
  122. assert debug.call_count
  123. def test_update_and_force(self):
  124. worker = Mock(name='worker')
  125. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  126. assert x.processes == 3
  127. x.force_scale_up(5)
  128. assert x.processes == 8
  129. x.update(5, None)
  130. assert x.processes == 5
  131. x.force_scale_down(3)
  132. assert x.processes == 2
  133. x.update(None, 3)
  134. assert x.processes == 3
  135. x.force_scale_down(1000)
  136. assert x.min_concurrency == 0
  137. assert x.processes == 0
  138. x.force_scale_up(1000)
  139. x.min_concurrency = 1
  140. x.force_scale_down(1)
  141. x.update(max=300, min=10)
  142. x.update(max=300, min=2)
  143. x.update(max=None, min=None)
  144. def test_info(self):
  145. worker = Mock(name='worker')
  146. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  147. info = x.info()
  148. assert info['max'] == 10
  149. assert info['min'] == 3
  150. assert info['current'] == 3
  151. @patch('os._exit')
  152. def test_thread_crash(self, _exit):
  153. class _Autoscaler(autoscale.Autoscaler):
  154. def body(self):
  155. self._is_shutdown.set()
  156. raise OSError('foo')
  157. worker = Mock(name='worker')
  158. x = _Autoscaler(self.pool, 10, 3, worker=worker)
  159. stderr = Mock()
  160. p, sys.stderr = sys.stderr, stderr
  161. try:
  162. x.run()
  163. finally:
  164. sys.stderr = p
  165. _exit.assert_called_with(1)
  166. stderr.write.assert_called()
  167. @mock.sleepdeprived(module=autoscale)
  168. def test_no_negative_scale(self):
  169. total_num_processes = []
  170. worker = Mock(name='worker')
  171. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  172. x.body() # the body func scales up or down
  173. _keep = [Mock(name='req{0}'.format(i)) for i in range(35)]
  174. for req in _keep:
  175. state.task_reserved(req)
  176. x.body()
  177. total_num_processes.append(self.pool.num_processes)
  178. for req in _keep:
  179. state.task_ready(req)
  180. x.body()
  181. total_num_processes.append(self.pool.num_processes)
  182. assert all(x.min_concurrency <= i <= x.max_concurrency
  183. for i in total_num_processes)