autoscale.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import os
  2. import sys
  3. import threading
  4. import traceback
  5. from time import sleep, time
  6. from celery.worker import state
  7. class Autoscaler(threading.Thread):
  8. def __init__(self, pool, max_concurrency, min_concurrency=0,
  9. keepalive=30, logger=None):
  10. threading.Thread.__init__(self)
  11. self.pool = pool
  12. self.max_concurrency = max_concurrency
  13. self.min_concurrency = min_concurrency
  14. self.keepalive = keepalive
  15. self.logger = logger
  16. self._last_action = None
  17. self._shutdown = threading.Event()
  18. self._stopped = threading.Event()
  19. self.setDaemon(True)
  20. self.setName(self.__class__.__name__)
  21. assert self.keepalive, "can't scale down too fast."
  22. def scale(self):
  23. current = min(self.qty, self.max_concurrency)
  24. if current > self.processes:
  25. self.scale_up(current - self.processes)
  26. elif current < self.processes:
  27. self.scale_down((self.processes - current) - self.min_concurrency)
  28. sleep(1.0)
  29. def scale_up(self, n):
  30. self.logger.info("Scaling up %s processes." % (n, ))
  31. self._last_action = time()
  32. return self.pool.grow(n)
  33. def scale_down(self, n):
  34. if not self._last_action or not n:
  35. return
  36. if time() - self._last_action > self.keepalive:
  37. self.logger.info("Scaling down %s processes." % (n, ))
  38. self._last_action = time()
  39. try:
  40. self.pool.shrink(n)
  41. except ValueError:
  42. self.logger.debug(
  43. "Autoscaler won't scale down: all processes busy.")
  44. except Exception, exc:
  45. self.logger.error("Autoscaler: scale_down: %r\n%r" % (
  46. exc, traceback.format_stack()),
  47. exc_info=sys.exc_info())
  48. def run(self):
  49. while not self._shutdown.isSet():
  50. try:
  51. self.scale()
  52. except Exception, exc:
  53. self.logger.error("Thread Autoscaler crashed: %r" % (exc, ),
  54. exc_info=sys.exc_info())
  55. os._exit(1)
  56. self._stopped.set()
  57. def stop(self):
  58. self._shutdown.set()
  59. self._stopped.wait()
  60. if self.isAlive():
  61. self.join(1e10)
  62. @property
  63. def qty(self):
  64. return len(state.reserved_requests)
  65. @property
  66. def processes(self):
  67. return self.pool._pool._processes