test_autoscale.py 6.2 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import sys
  4. from case import Mock, mock, patch
  5. from celery.concurrency.base import BasePool
  6. from celery.five import monotonic
  7. from celery.worker import state
  8. from celery.worker import autoscale
  9. from celery.utils.objects import Bunch
  10. class MockPool(BasePool):
  11. shrink_raises_exception = False
  12. shrink_raises_ValueError = False
  13. def __init__(self, *args, **kwargs):
  14. super(MockPool, self).__init__(*args, **kwargs)
  15. self._pool = Bunch(_processes=self.limit)
  16. def grow(self, n=1):
  17. self._pool._processes += n
  18. def shrink(self, n=1):
  19. if self.shrink_raises_exception:
  20. raise KeyError('foo')
  21. if self.shrink_raises_ValueError:
  22. raise ValueError('foo')
  23. self._pool._processes -= n
  24. @property
  25. def num_processes(self):
  26. return self._pool._processes
  27. class test_WorkerComponent:
  28. def test_register_with_event_loop(self):
  29. parent = Mock(name='parent')
  30. parent.autoscale = True
  31. parent.consumer.on_task_message = set()
  32. w = autoscale.WorkerComponent(parent)
  33. assert parent.autoscaler is None
  34. assert w.enabled
  35. hub = Mock(name='hub')
  36. w.create(parent)
  37. w.register_with_event_loop(parent, hub)
  38. assert (parent.autoscaler.maybe_scale in
  39. parent.consumer.on_task_message)
  40. hub.call_repeatedly.assert_called_with(
  41. parent.autoscaler.keepalive, parent.autoscaler.maybe_scale,
  42. )
  43. parent.hub = hub
  44. hub.on_init = []
  45. w.instantiate = Mock()
  46. w.register_with_event_loop(parent, Mock(name='loop'))
  47. assert parent.consumer.on_task_message
  48. class test_Autoscaler:
  49. def setup(self):
  50. self.pool = MockPool(3)
  51. def test_stop(self):
  52. class Scaler(autoscale.Autoscaler):
  53. alive = True
  54. joined = False
  55. def is_alive(self):
  56. return self.alive
  57. def join(self, timeout=None):
  58. self.joined = True
  59. worker = Mock(name='worker')
  60. x = Scaler(self.pool, 10, 3, worker=worker)
  61. x._is_stopped.set()
  62. x.stop()
  63. assert x.joined
  64. x.joined = False
  65. x.alive = False
  66. x.stop()
  67. assert not x.joined
  68. @mock.sleepdeprived(module=autoscale)
  69. def test_body(self):
  70. worker = Mock(name='worker')
  71. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  72. x.body()
  73. assert x.pool.num_processes == 3
  74. _keep = [Mock(name='req{0}'.format(i)) for i in range(20)]
  75. [state.task_reserved(m) for m in _keep]
  76. x.body()
  77. x.body()
  78. assert x.pool.num_processes == 10
  79. worker.consumer._update_prefetch_count.assert_called()
  80. state.reserved_requests.clear()
  81. x.body()
  82. assert x.pool.num_processes == 10
  83. x._last_scale_up = monotonic() - 10000
  84. x.body()
  85. assert x.pool.num_processes == 3
  86. worker.consumer._update_prefetch_count.assert_called()
  87. def test_run(self):
  88. class Scaler(autoscale.Autoscaler):
  89. scale_called = False
  90. def body(self):
  91. self.scale_called = True
  92. self._is_shutdown.set()
  93. worker = Mock(name='worker')
  94. x = Scaler(self.pool, 10, 3, worker=worker)
  95. x.run()
  96. assert x._is_shutdown.isSet()
  97. assert x._is_stopped.isSet()
  98. assert x.scale_called
  99. def test_shrink_raises_exception(self):
  100. worker = Mock(name='worker')
  101. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  102. x.scale_up(3)
  103. x.pool.shrink_raises_exception = True
  104. x._shrink(1)
  105. @patch('celery.worker.autoscale.debug')
  106. def test_shrink_raises_ValueError(self, debug):
  107. worker = Mock(name='worker')
  108. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  109. x.scale_up(3)
  110. x._last_scale_up = monotonic() - 10000
  111. x.pool.shrink_raises_ValueError = True
  112. x.scale_down(1)
  113. assert debug.call_count
  114. def test_update_and_force(self):
  115. worker = Mock(name='worker')
  116. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  117. assert x.processes == 3
  118. x.force_scale_up(5)
  119. assert x.processes == 8
  120. x.update(5, None)
  121. assert x.processes == 5
  122. x.force_scale_down(3)
  123. assert x.processes == 2
  124. x.update(None, 3)
  125. assert x.processes == 3
  126. x.force_scale_down(1000)
  127. assert x.min_concurrency == 0
  128. assert x.processes == 0
  129. x.force_scale_up(1000)
  130. x.min_concurrency = 1
  131. x.force_scale_down(1)
  132. x.update(max=300, min=10)
  133. x.update(max=300, min=2)
  134. x.update(max=None, min=None)
  135. def test_info(self):
  136. worker = Mock(name='worker')
  137. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  138. info = x.info()
  139. assert info['max'] == 10
  140. assert info['min'] == 3
  141. assert info['current'] == 3
  142. @patch('os._exit')
  143. def test_thread_crash(self, _exit):
  144. class _Autoscaler(autoscale.Autoscaler):
  145. def body(self):
  146. self._is_shutdown.set()
  147. raise OSError('foo')
  148. worker = Mock(name='worker')
  149. x = _Autoscaler(self.pool, 10, 3, worker=worker)
  150. stderr = Mock()
  151. p, sys.stderr = sys.stderr, stderr
  152. try:
  153. x.run()
  154. finally:
  155. sys.stderr = p
  156. _exit.assert_called_with(1)
  157. stderr.write.assert_called()
  158. @mock.sleepdeprived(module=autoscale)
  159. def test_no_negative_scale(self):
  160. total_num_processes = []
  161. worker = Mock(name='worker')
  162. x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
  163. x.body() # the body func scales up or down
  164. _keep = [Mock(name='req{0}'.format(i)) for i in range(35)]
  165. for req in _keep:
  166. state.task_reserved(req)
  167. x.body()
  168. total_num_processes.append(self.pool.num_processes)
  169. for req in _keep:
  170. state.task_ready(req)
  171. x.body()
  172. total_num_processes.append(self.pool.num_processes)
  173. assert all(x.min_concurrency <= i <= x.max_concurrency
  174. for i in total_num_processes)