123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # -*- coding: utf-8 -*-
- """Pool Autoscaling.
- This module implements the internal thread responsible
- for growing and shrinking the pool according to the
- current autoscale settings.
- The autoscale thread is only enabled if
- the :option:`celery worker --autoscale` option is used.
- """
- from __future__ import absolute_import, unicode_literals
- import os
- import threading
- from time import sleep
- from kombu.async.semaphore import DummyLock
- from celery import bootsteps
- from celery.five import monotonic
- from celery.utils.log import get_logger
- from celery.utils.threads import bgThread
- from . import state
- from .components import Pool
- __all__ = ['Autoscaler', 'WorkerComponent']
- logger = get_logger(__name__)
- debug, info, error = logger.debug, logger.info, logger.error
- AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))
- class WorkerComponent(bootsteps.StartStopStep):
- """Bootstep that starts the autoscaler thread/timer in the worker."""
- label = 'Autoscaler'
- conditional = True
- requires = (Pool,)
- def __init__(self, w, **kwargs):
- self.enabled = w.autoscale
- w.autoscaler = None
- def create(self, w):
- scaler = w.autoscaler = self.instantiate(
- w.autoscaler_cls,
- w.pool, w.max_concurrency, w.min_concurrency,
- worker=w, mutex=DummyLock() if w.use_eventloop else None,
- )
- return scaler if not w.use_eventloop else None
- def register_with_event_loop(self, w, hub):
- w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
- hub.call_repeatedly(
- w.autoscaler.keepalive, w.autoscaler.maybe_scale,
- )
- class Autoscaler(bgThread):
- """Background thread to autoscale pool workers."""
- def __init__(self, pool, max_concurrency,
- min_concurrency=0, worker=None,
- keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
- super(Autoscaler, self).__init__()
- self.pool = pool
- self.mutex = mutex or threading.Lock()
- self.max_concurrency = max_concurrency
- self.min_concurrency = min_concurrency
- self.keepalive = keepalive
- self._last_scale_up = None
- self.worker = worker
- assert self.keepalive, 'cannot scale down too fast.'
- def body(self):
- with self.mutex:
- self.maybe_scale()
- sleep(1.0)
- def _maybe_scale(self, req=None):
- procs = self.processes
- cur = min(self.qty, self.max_concurrency)
- if cur > procs:
- self.scale_up(cur - procs)
- return True
- cur = max(self.qty, self.min_concurrency)
- if cur < procs:
- self.scale_down(procs - cur)
- return True
- def maybe_scale(self, req=None):
- if self._maybe_scale(req):
- self.pool.maintain_pool()
- def update(self, max=None, min=None):
- with self.mutex:
- if max is not None:
- if max < self.processes:
- self._shrink(self.processes - max)
- self.max_concurrency = max
- if min is not None:
- if min > self.processes:
- self._grow(min - self.processes)
- self.min_concurrency = min
- return self.max_concurrency, self.min_concurrency
- def force_scale_up(self, n):
- with self.mutex:
- new = self.processes + n
- if new > self.max_concurrency:
- self.max_concurrency = new
- self._grow(n)
- def force_scale_down(self, n):
- with self.mutex:
- new = self.processes - n
- if new < self.min_concurrency:
- self.min_concurrency = max(new, 0)
- self._shrink(min(n, self.processes))
- def scale_up(self, n):
- self._last_scale_up = monotonic()
- return self._grow(n)
- def scale_down(self, n):
- if self._last_scale_up and (
- monotonic() - self._last_scale_up > self.keepalive):
- return self._shrink(n)
- def _grow(self, n):
- info('Scaling up %s processes.', n)
- self.pool.grow(n)
- self.worker.consumer._update_prefetch_count(n)
- def _shrink(self, n):
- info('Scaling down %s processes.', n)
- try:
- self.pool.shrink(n)
- except ValueError:
- debug("Autoscaler won't scale down: all processes busy.")
- except Exception as exc:
- error('Autoscaler: scale_down: %r', exc, exc_info=True)
- self.worker.consumer._update_prefetch_count(-n)
- def info(self):
- return {
- 'max': self.max_concurrency,
- 'min': self.min_concurrency,
- 'current': self.processes,
- 'qty': self.qty,
- }
- @property
- def qty(self):
- return len(state.reserved_requests)
- @property
- def processes(self):
- return self.pool.num_processes
|