autoscale.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoscale
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements the internal thread responsible
  6. for growing and shrinking the pool according to the
  7. current autoscale settings.
  8. The autoscale thread is only enabled if autoscale
  9. has been enabled on the command line.
  10. """
  11. from __future__ import absolute_import
  12. import threading
  13. from functools import partial
  14. from time import sleep, time
  15. from celery import bootsteps
  16. from celery.utils.log import get_logger
  17. from celery.utils.threads import bgThread
  18. from . import state
  19. from .components import Pool
  20. from .hub import DummyLock
  21. logger = get_logger(__name__)
  22. debug, info, error = logger.debug, logger.info, logger.error
  23. class WorkerComponent(bootsteps.StartStopStep):
  24. name = 'Autoscaler'
  25. requires = (Pool, )
  26. def __init__(self, w, **kwargs):
  27. self.enabled = w.autoscale
  28. w.autoscaler = None
  29. def create_threaded(self, w):
  30. scaler = w.autoscaler = self.instantiate(w.autoscaler_cls,
  31. w.pool, w.max_concurrency, w.min_concurrency)
  32. return scaler
  33. def on_poll_init(self, scaler, hub):
  34. hub.on_task.append(scaler.maybe_scale)
  35. hub.timer.apply_interval(scaler.keepalive * 1000.0, scaler.maybe_scale)
  36. def create_ev(self, w):
  37. scaler = w.autoscaler = self.instantiate(w.autoscaler_cls,
  38. w.pool, w.max_concurrency, w.min_concurrency,
  39. mutex=DummyLock())
  40. w.hub.on_init.append(partial(self.on_poll_init, scaler))
  41. def create(self, w):
  42. return (self.create_ev if w.use_eventloop
  43. else self.create_threaded)(w)
  44. class Autoscaler(bgThread):
  45. def __init__(self, pool, max_concurrency, min_concurrency=0, keepalive=30,
  46. mutex=None):
  47. super(Autoscaler, self).__init__()
  48. self.pool = pool
  49. self.mutex = mutex or threading.Lock()
  50. self.max_concurrency = max_concurrency
  51. self.min_concurrency = min_concurrency
  52. self.keepalive = keepalive
  53. self._last_action = None
  54. assert self.keepalive, 'cannot scale down too fast.'
  55. def body(self):
  56. with self.mutex:
  57. self.maybe_scale()
  58. sleep(1.0)
  59. def _maybe_scale(self):
  60. procs = self.processes
  61. cur = min(self.qty, self.max_concurrency)
  62. if cur > procs:
  63. self.scale_up(cur - procs)
  64. return True
  65. elif cur < procs:
  66. self.scale_down((procs - cur) - self.min_concurrency)
  67. return True
  68. def maybe_scale(self):
  69. if self._maybe_scale():
  70. self.pool.maintain_pool()
  71. def update(self, max=None, min=None):
  72. with self.mutex:
  73. if max is not None:
  74. if max < self.max_concurrency:
  75. self._shrink(self.processes - max)
  76. self.max_concurrency = max
  77. if min is not None:
  78. if min > self.min_concurrency:
  79. self._grow(min - self.min_concurrency)
  80. self.min_concurrency = min
  81. return self.max_concurrency, self.min_concurrency
  82. def force_scale_up(self, n):
  83. with self.mutex:
  84. new = self.processes + n
  85. if new > self.max_concurrency:
  86. self.max_concurrency = new
  87. self.min_concurrency += 1
  88. self._grow(n)
  89. def force_scale_down(self, n):
  90. with self.mutex:
  91. new = self.processes - n
  92. if new < self.min_concurrency:
  93. self.min_concurrency = max(new, 0)
  94. self._shrink(min(n, self.processes))
  95. def scale_up(self, n):
  96. self._last_action = time()
  97. return self._grow(n)
  98. def scale_down(self, n):
  99. if n and self._last_action and (
  100. time() - self._last_action > self.keepalive):
  101. self._last_action = time()
  102. return self._shrink(n)
  103. def _grow(self, n):
  104. info('Scaling up %s processes.', n)
  105. self.pool.grow(n)
  106. def _shrink(self, n):
  107. info('Scaling down %s processes.', n)
  108. try:
  109. self.pool.shrink(n)
  110. except ValueError:
  111. debug("Autoscaler won't scale down: all processes busy.")
  112. except Exception as exc:
  113. error('Autoscaler: scale_down: %r', exc, exc_info=True)
  114. def info(self):
  115. return {'max': self.max_concurrency,
  116. 'min': self.min_concurrency,
  117. 'current': self.processes,
  118. 'qty': self.qty}
  119. @property
  120. def qty(self):
  121. return len(state.reserved_requests)
  122. @property
  123. def processes(self):
  124. return self.pool.num_processes