autoscale.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding: utf-8 -*-
  2. """Pool Autoscaling.
  3. This module implements the internal thread responsible
  4. for growing and shrinking the pool according to the
  5. current autoscale settings.
  6. The autoscale thread is only enabled if
  7. the :option:`celery worker --autoscale` option is used.
  8. """
  9. from __future__ import absolute_import, unicode_literals
  10. import os
  11. import threading
  12. from time import sleep
  13. from kombu.async.semaphore import DummyLock
  14. from celery import bootsteps
  15. from celery.five import monotonic
  16. from celery.utils.log import get_logger
  17. from celery.utils.threads import bgThread
  18. from . import state
  19. from .components import Pool
  20. __all__ = ['Autoscaler', 'WorkerComponent']
  21. logger = get_logger(__name__)
  22. debug, info, error = logger.debug, logger.info, logger.error
  23. AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))
  24. class WorkerComponent(bootsteps.StartStopStep):
  25. """Bootstep that starts the autoscaler thread/timer in the worker."""
  26. label = 'Autoscaler'
  27. conditional = True
  28. requires = (Pool,)
  29. def __init__(self, w, **kwargs):
  30. self.enabled = w.autoscale
  31. w.autoscaler = None
  32. def create(self, w):
  33. scaler = w.autoscaler = self.instantiate(
  34. w.autoscaler_cls,
  35. w.pool, w.max_concurrency, w.min_concurrency,
  36. worker=w, mutex=DummyLock() if w.use_eventloop else None,
  37. )
  38. return scaler if not w.use_eventloop else None
  39. def register_with_event_loop(self, w, hub):
  40. w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
  41. hub.call_repeatedly(
  42. w.autoscaler.keepalive, w.autoscaler.maybe_scale,
  43. )
  44. class Autoscaler(bgThread):
  45. """Background thread to autoscale pool workers."""
  46. def __init__(self, pool, max_concurrency,
  47. min_concurrency=0, worker=None,
  48. keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
  49. super(Autoscaler, self).__init__()
  50. self.pool = pool
  51. self.mutex = mutex or threading.Lock()
  52. self.max_concurrency = max_concurrency
  53. self.min_concurrency = min_concurrency
  54. self.keepalive = keepalive
  55. self._last_scale_up = None
  56. self.worker = worker
  57. assert self.keepalive, 'cannot scale down too fast.'
  58. def body(self):
  59. with self.mutex:
  60. self.maybe_scale()
  61. sleep(1.0)
  62. def _maybe_scale(self, req=None):
  63. procs = self.processes
  64. cur = min(self.qty, self.max_concurrency)
  65. if cur > procs:
  66. self.scale_up(cur - procs)
  67. return True
  68. cur = max(self.qty, self.min_concurrency)
  69. if cur < procs:
  70. self.scale_down(procs - cur)
  71. return True
  72. def maybe_scale(self, req=None):
  73. if self._maybe_scale(req):
  74. self.pool.maintain_pool()
  75. def update(self, max=None, min=None):
  76. with self.mutex:
  77. if max is not None:
  78. if max < self.processes:
  79. self._shrink(self.processes - max)
  80. self.max_concurrency = max
  81. if min is not None:
  82. if min > self.processes:
  83. self._grow(min - self.processes)
  84. self.min_concurrency = min
  85. return self.max_concurrency, self.min_concurrency
  86. def force_scale_up(self, n):
  87. with self.mutex:
  88. new = self.processes + n
  89. if new > self.max_concurrency:
  90. self.max_concurrency = new
  91. self._grow(n)
  92. def force_scale_down(self, n):
  93. with self.mutex:
  94. new = self.processes - n
  95. if new < self.min_concurrency:
  96. self.min_concurrency = max(new, 0)
  97. self._shrink(min(n, self.processes))
  98. def scale_up(self, n):
  99. self._last_scale_up = monotonic()
  100. return self._grow(n)
  101. def scale_down(self, n):
  102. if self._last_scale_up and (
  103. monotonic() - self._last_scale_up > self.keepalive):
  104. return self._shrink(n)
  105. def _grow(self, n):
  106. info('Scaling up %s processes.', n)
  107. self.pool.grow(n)
  108. self.worker.consumer._update_prefetch_count(n)
  109. def _shrink(self, n):
  110. info('Scaling down %s processes.', n)
  111. try:
  112. self.pool.shrink(n)
  113. except ValueError:
  114. debug("Autoscaler won't scale down: all processes busy.")
  115. except Exception as exc:
  116. error('Autoscaler: scale_down: %r', exc, exc_info=True)
  117. self.worker.consumer._update_prefetch_count(-n)
  118. def info(self):
  119. return {
  120. 'max': self.max_concurrency,
  121. 'min': self.min_concurrency,
  122. 'current': self.processes,
  123. 'qty': self.qty,
  124. }
  125. @property
  126. def qty(self):
  127. return len(state.reserved_requests)
  128. @property
  129. def processes(self):
  130. return self.pool.num_processes