autoscale.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import sys
  2. import threading
  3. import traceback
  4. from time import sleep, time
  5. from celery.worker import state
  6. class Autoscaler(threading.Thread):
  7. def __init__(self, pool, max_concurrency, min_concurrency=0,
  8. keepalive=30, logger=None):
  9. threading.Thread.__init__(self)
  10. self.pool = pool
  11. self.max_concurrency = max_concurrency
  12. self.min_concurrency = min_concurrency
  13. self.keepalive = keepalive
  14. self.logger = logger
  15. self._last_action = None
  16. self._shutdown = threading.Event()
  17. self._stopped = threading.Event()
  18. self.setDaemon(True)
  19. self.setName(self.__class__.__name__)
  20. assert self.keepalive, "can't scale down too fast."
  21. def scale(self):
  22. current = min(self.qty, self.max_concurrency)
  23. if current > self.processes:
  24. self.scale_up(current - self.processes)
  25. elif current < self.processes:
  26. self.scale_down((self.processes - current) - self.min_concurrency)
  27. sleep(1.0)
  28. def scale_up(self, n):
  29. self.logger.info("Scaling up %s processes." % (n, ))
  30. self._last_action = time()
  31. return self.pool.grow(n)
  32. def scale_down(self, n):
  33. if not self._last_action or not n:
  34. return
  35. if time() - self._last_action > self.keepalive:
  36. self.logger.info("Scaling down %s processes." % (n, ))
  37. self._last_action = time()
  38. try:
  39. self.pool.shrink(n)
  40. except Exception, exc:
  41. traceback.print_stack()
  42. self.logger.error("Autoscaler: scale_down: %r" % (exc, ),
  43. exc_info=sys.exc_info())
  44. def run(self):
  45. while not self._shutdown.isSet():
  46. self.scale()
  47. self._stopped.set()
  48. def stop(self):
  49. self._shutdown.set()
  50. self._stopped.wait()
  51. self.join(1e100)
  52. @property
  53. def qty(self):
  54. return len(state.reserved_requests)
  55. @property
  56. def processes(self):
  57. return self.pool._pool._processes