test_worker_autoscale.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import logging
  2. from time import time
  3. from celery.concurrency.base import BasePool
  4. from celery.worker import state
  5. from celery.worker import autoscale
  6. from celery.tests.utils import unittest, sleepdeprived
  7. logger = logging.getLogger("celery.tests.autoscale")
  8. class Object(object):
  9. pass
  10. class MockPool(BasePool):
  11. shrink_raises_exception = False
  12. def __init__(self, *args, **kwargs):
  13. super(MockPool, self).__init__(*args, **kwargs)
  14. self._pool = Object()
  15. self._pool._processes = self.limit
  16. def grow(self, n=1):
  17. self._pool._processes += n
  18. def shrink(self, n=1):
  19. if self.shrink_raises_exception:
  20. raise KeyError("foo")
  21. self._pool._processes -= n
  22. @property
  23. def current(self):
  24. return self._pool._processes
  25. class test_Autoscaler(unittest.TestCase):
  26. def setUp(self):
  27. self.pool = MockPool(3)
  28. def test_stop(self):
  29. class Scaler(autoscale.Autoscaler):
  30. alive = True
  31. joined = False
  32. def isAlive(self):
  33. return self.alive
  34. def join(self, timeout=None):
  35. self.joined = True
  36. x = Scaler(self.pool, 10, 3, logger=logger)
  37. x._stopped.set()
  38. x.stop()
  39. self.assertTrue(x.joined)
  40. x.joined = False
  41. x.alive = False
  42. x.stop()
  43. self.assertFalse(x.joined)
  44. @sleepdeprived(autoscale)
  45. def test_scale(self):
  46. x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
  47. x.scale()
  48. self.assertEqual(x.pool.current, 3)
  49. for i in range(20):
  50. state.reserved_requests.add(i)
  51. x.scale()
  52. x.scale()
  53. self.assertEqual(x.pool.current, 10)
  54. state.reserved_requests.clear()
  55. x.scale()
  56. self.assertEqual(x.pool.current, 10)
  57. x._last_action = time() - 10000
  58. x.scale()
  59. self.assertEqual(x.pool.current, 3)
  60. def test_run(self):
  61. class Scaler(autoscale.Autoscaler):
  62. scale_called = False
  63. def scale(self):
  64. self.scale_called = True
  65. self._shutdown.set()
  66. x = Scaler(self.pool, 10, 3, logger=logger)
  67. x.run()
  68. self.assertTrue(x._shutdown.isSet())
  69. self.assertTrue(x._stopped.isSet())
  70. self.assertTrue(x.scale_called)
  71. def test_shrink_raises_exception(self):
  72. x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
  73. x.scale_up(3)
  74. x._last_action = time() - 10000
  75. x.pool.shrink_raises_exception = True
  76. x.scale_down(1)