浏览代码

Revert "Removes autoscale option completely"

This reverts commit 8e0ab0f09254039cdf05f18ff283bcbb0d5d99be.
Ask Solem 8 年之前
父节点
当前提交
fca57b63f7

+ 1 - 1
CONTRIBUTING.rst

@@ -379,7 +379,7 @@ Tags
 ====
 
 - Tags are used exclusively for tagging releases. A release tag is
-named with the format ``vX.Y.Z`` -- for example ``v2.3.1``.
+  named with the format ``vX.Y.Z`` -- for example ``v2.3.1``.
 
 - Experimental releases contain an additional identifier ``vX.Y.Z-id`` --
   for example ``v3.0.0-rc1``.

+ 9 - 0
celery/app/control.py

@@ -319,6 +319,15 @@ class Control(object):
         """
         return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
 
+    def autoscale(self, max, min, destination=None, **kwargs):
+        """Change worker(s) autoscale setting.
+
+        Supports the same arguments as :meth:`broadcast`.
+
+        """
+        return self.broadcast(
+            'autoscale', {'max': max, 'min': min}, destination, **kwargs)
+
     def broadcast(self, command, arguments=None, destination=None,
                   connection=None, reply=False, timeout=1, limit=None,
                   callback=None, channel=None, **extra_kwargs):

+ 1 - 0
celery/app/defaults.py

@@ -247,6 +247,7 @@ NAMESPACES = Namespace(
     worker=Namespace(
         __old__=OLD_NS_WORKER,
         agent=Option(None, type='string'),
+        autoscaler=Option('celery.worker.autoscale:Autoscaler'),
         concurrency=Option(0, type='int'),
         consumer=Option('celery.worker.consumer:Consumer', type='string'),
         direct=Option(False, type='bool', old={'celery_worker_direct'}),

+ 3 - 0
celery/apps/worker.py

@@ -199,6 +199,9 @@ class Worker(WorkController):
             if loader.startswith('celery.loaders'):  # pragma: no cover
                 loader = loader[14:]
             appr += ' ({0})'.format(loader)
+        if self.autoscale:
+            max, min = self.autoscale
+            concurrency = '{{min={0}, max={1}}}'.format(min, max)
         pool = self.pool_cls
         if not isinstance(pool, string_t):
             pool = pool.__module__

+ 11 - 0
celery/bin/worker.py

@@ -124,6 +124,15 @@ The :program:`celery worker` command (previously known as ``celeryd``)
     completed and the child process will be replaced afterwards.
     Default: no limit.
 
+.. cmdoption:: --autoscale
+
+    Enable autoscaling by providing
+    max_concurrency, min_concurrency. Example::
+
+        --autoscale=10,3
+
+    (always keep 3 processes, but grow to 10 if necessary)
+
 .. cmdoption:: --detach
 
     Start worker as a background process.
@@ -194,6 +203,7 @@ class worker(Command):
 
             $ celery worker -A proj --concurrency=4
             $ celery worker -A proj --concurrency=1000 -P eventlet
+            $ celery worker --autoscale=10,0
     """
 
     doc = HELP  # parse help from this too
@@ -320,6 +330,7 @@ class worker(Command):
             '--without-heartbeat', action='store_true', default=False,
         )
         fopts.add_argument('--heartbeat-interval', type=int)
+        fopts.add_argument('--autoscale')
 
         daemon_options(parser)
 

+ 5 - 1
celery/worker/__init__.py

@@ -87,6 +87,7 @@ class WorkController(object):
             'celery.worker.components:Timer',
             'celery.worker.components:StateDB',
             'celery.worker.components:Consumer',
+            'celery.worker.autoscale:WorkerComponent',
         }
 
     def __init__(self, app=None, hostname=None, **kwargs):
@@ -353,7 +354,9 @@ class WorkController(object):
     def setup_defaults(self, concurrency=None, loglevel='WARN', logfile=None,
                        task_events=None, pool=None, consumer_cls=None,
                        timer_cls=None, timer_precision=None,
-                       pool_putlocks=None, pool_restarts=None,
+                       autoscaler_cls=None,
+                       pool_putlocks=None,
+                       pool_restarts=None,
                        optimization=None, O=None,  # O maps to -O=fair
                        statedb=None,
                        time_limit=None,
@@ -382,6 +385,7 @@ class WorkController(object):
             'worker_timer_precision', timer_precision,
         )
         self.optimization = optimization or O
+        self.autoscaler_cls = either('worker_autoscaler', autoscaler_cls)
         self.pool_putlocks = either('worker_pool_putlocks', pool_putlocks)
         self.pool_restarts = either('worker_pool_restarts', pool_restarts)
         self.statedb = either('worker_state_db', statedb, state_db)

+ 163 - 0
celery/worker/autoscale.py

@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+"""Pool Autoscaling.
+
+This module implements the internal thread responsible
+for growing and shrinking the pool according to the
+current autoscale settings.
+
+The autoscale thread is only enabled if
+the :option:`celery worker --autoscale` option is used.
+"""
+from __future__ import absolute_import, unicode_literals
+
+import os
+import threading
+
+from time import sleep
+
+from kombu.async.semaphore import DummyLock
+
+from celery import bootsteps
+from celery.five import monotonic
+from celery.utils.log import get_logger
+from celery.utils.threads import bgThread
+
+from . import state
+from .components import Pool
+
+__all__ = ['Autoscaler', 'WorkerComponent']
+
+logger = get_logger(__name__)
+debug, info, error = logger.debug, logger.info, logger.error
+
+AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))
+
+
+class WorkerComponent(bootsteps.StartStopStep):
+    """Bootstep that starts the autoscaler thread/timer in the worker."""
+
+    label = 'Autoscaler'
+    conditional = True
+    requires = (Pool,)
+
+    def __init__(self, w, **kwargs):
+        self.enabled = w.autoscale
+        w.autoscaler = None
+
+    def create(self, w):
+        scaler = w.autoscaler = self.instantiate(
+            w.autoscaler_cls,
+            w.pool, w.max_concurrency, w.min_concurrency,
+            worker=w, mutex=DummyLock() if w.use_eventloop else None,
+        )
+        return scaler if not w.use_eventloop else None
+
+    def register_with_event_loop(self, w, hub):
+        w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
+        hub.call_repeatedly(
+            w.autoscaler.keepalive, w.autoscaler.maybe_scale,
+        )
+
+
+class Autoscaler(bgThread):
+    """Background thread to autoscale pool workers."""
+
+    def __init__(self, pool, max_concurrency,
+                 min_concurrency=0, worker=None,
+                 keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
+        super(Autoscaler, self).__init__()
+        self.pool = pool
+        self.mutex = mutex or threading.Lock()
+        self.max_concurrency = max_concurrency
+        self.min_concurrency = min_concurrency
+        self.keepalive = keepalive
+        self._last_scale_up = None
+        self.worker = worker
+
+        assert self.keepalive, 'cannot scale down too fast.'
+
+    def body(self):
+        with self.mutex:
+            self.maybe_scale()
+        sleep(1.0)
+
+    def _maybe_scale(self, req=None):
+        procs = self.processes
+        cur = min(self.qty, self.max_concurrency)
+        if cur > procs:
+            self.scale_up(cur - procs)
+            return True
+        cur = max(self.qty, self.min_concurrency)
+        if cur < procs:
+            self.scale_down(procs - cur)
+            return True
+
+    def maybe_scale(self, req=None):
+        if self._maybe_scale(req):
+            self.pool.maintain_pool()
+
+    def update(self, max=None, min=None):
+        with self.mutex:
+            if max is not None:
+                if max < self.processes:
+                    self._shrink(self.processes - max)
+                self.max_concurrency = max
+            if min is not None:
+                if min > self.processes:
+                    self._grow(min - self.processes)
+                self.min_concurrency = min
+            return self.max_concurrency, self.min_concurrency
+
+    def force_scale_up(self, n):
+        with self.mutex:
+            new = self.processes + n
+            if new > self.max_concurrency:
+                self.max_concurrency = new
+            self._grow(n)
+
+    def force_scale_down(self, n):
+        with self.mutex:
+            new = self.processes - n
+            if new < self.min_concurrency:
+                self.min_concurrency = max(new, 0)
+            self._shrink(min(n, self.processes))
+
+    def scale_up(self, n):
+        self._last_scale_up = monotonic()
+        return self._grow(n)
+
+    def scale_down(self, n):
+        if self._last_scale_up and (
+                monotonic() - self._last_scale_up > self.keepalive):
+            return self._shrink(n)
+
+    def _grow(self, n):
+        info('Scaling up %s processes.', n)
+        self.pool.grow(n)
+        self.worker.consumer._update_prefetch_count(n)
+
+    def _shrink(self, n):
+        info('Scaling down %s processes.', n)
+        try:
+            self.pool.shrink(n)
+        except ValueError:
+            debug("Autoscaler won't scale down: all processes busy.")
+        except Exception as exc:
+            error('Autoscaler: scale_down: %r', exc, exc_info=True)
+        self.worker.consumer._update_prefetch_count(-n)
+
+    def info(self):
+        return {
+            'max': self.max_concurrency,
+            'min': self.min_concurrency,
+            'current': self.processes,
+            'qty': self.qty,
+        }
+
+    @property
+    def qty(self):
+        return len(state.reserved_requests)
+
+    @property
+    def processes(self):
+        return self.pool.num_processes

+ 9 - 1
celery/worker/components.py

@@ -12,6 +12,7 @@ from kombu.async.timer import Timer as _Timer
 from celery import bootsteps
 from celery._state import _set_task_join_will_block
 from celery.exceptions import ImproperlyConfigured
+from celery.five import string_t
 from celery.platforms import IS_WINDOWS
 from celery.utils.log import worker_logger as logger
 
@@ -105,6 +106,7 @@ class Pool(bootsteps.StartStopStep):
 
     Adds attributes:
 
+        * autoscale
         * pool
         * max_concurrency
         * min_concurrency
@@ -112,11 +114,17 @@ class Pool(bootsteps.StartStopStep):
 
     requires = (Hub,)
 
-    def __init__(self, w, **kwargs):
+    def __init__(self, w, autoscale=None, **kwargs):
         w.pool = None
         w.max_concurrency = None
         w.min_concurrency = w.concurrency
         self.optimization = w.optimization
+        if isinstance(autoscale, string_t):
+            max_c, _, min_c = autoscale.partition(',')
+            autoscale = [int(max_c), min_c and int(min_c) or 0]
+        w.autoscale = autoscale
+        if w.autoscale:
+            w.max_concurrency, w.min_concurrency = w.autoscale
         super(Pool, self).__init__(w, **kwargs)
 
     def close(self, w):

+ 2 - 3
celery/worker/consumer/consumer.py

@@ -246,9 +246,8 @@ class Consumer(object):
 
         Note:
             Currently pool grow operations will end up with an offset
-            of +1 if the initial size of the pool was 0 (this could
-            be the case with the old deprecated autoscale option, may consider
-            removing this now that it's no longer supported).
+            of +1 if the initial size of the pool was 0 (e.g.
+            :option:`--autoscale=1,0 <celery worker --autoscale>`).
         """
         num_processes = self.pool.num_processes
         if not self.initial_prefetch_count or not num_processes:

+ 23 - 4
celery/worker/control.py

@@ -470,8 +470,11 @@ def memdump(state, samples=10, **kwargs):  # pragma: no cover
 )
 def pool_grow(state, n=1, **kwargs):
     """Grow pool by n processes/threads."""
-    state.consumer.pool.grow(n)
-    state.consumer._update_prefetch_count(n)
+    if state.consumer.controller.autoscaler:
+        state.consumer.controller.autoscaler.force_scale_up(n)
+    else:
+        state.consumer.pool.grow(n)
+        state.consumer._update_prefetch_count(n)
     return ok('pool will grow')
 
 
@@ -481,8 +484,11 @@ def pool_grow(state, n=1, **kwargs):
 )
 def pool_shrink(state, n=1, **kwargs):
     """Shrink pool by n processes/threads."""
-    state.consumer.pool.shrink(n)
-    state.consumer._update_prefetch_count(-n)
+    if state.consumer.controller.autoscaler:
+        state.consumer.controller.autoscaler.force_scale_down(n)
+    else:
+        state.consumer.pool.shrink(n)
+        state.consumer._update_prefetch_count(-n)
     return ok('pool will shrink')
 
 
@@ -496,6 +502,19 @@ def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
         raise ValueError('Pool restarts not enabled')
 
 
+@control_command(
+    args=[('max', int), ('min', int)],
+    signature='[max [min]]',
+)
+def autoscale(state, max=None, min=None):
+    """Modify autoscale settings."""
+    autoscaler = state.consumer.controller.autoscaler
+    if autoscaler:
+        max_, min_ = autoscaler.update(max, min)
+        return ok('autoscale now max={0} min={1}'.format(max_, min_))
+    raise ValueError('Autoscale not enabled')
+
+
 @control_command()
 def shutdown(state, msg='Got shutdown from remote', **kwargs):
     """Shutdown worker(s)."""

+ 11 - 0
docs/internals/reference/celery.worker.autoscale.rst

@@ -0,0 +1,11 @@
+========================================
+ ``celery.worker.autoscale``
+========================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.worker.autoscale
+
+.. automodule:: celery.worker.autoscale
+    :members:
+    :undoc-members:

+ 1 - 0
docs/internals/reference/index.rst

@@ -13,6 +13,7 @@
     celery.worker.heartbeat
     celery.worker.control
     celery.worker.pidbox
+    celery.worker.autoscale
     celery.concurrency
     celery.concurrency.solo
     celery.concurrency.prefork

+ 12 - 0
docs/userguide/configuration.rst

@@ -132,6 +132,7 @@ rush in moving to the new settings format.
 ``CELERYD_TASK_TIME_LIMIT``            :setting:`task_time_limit`
 ``CELERY_TRACK_STARTED``               :setting:`task_track_started`
 ``CELERYD_AGENT``                      :setting:`worker_agent`
+``CELERYD_AUTOSCALER``                 :setting:`worker_autoscaler`
 ``CELERYD_CONCURRENCY``                :setting:`worker_concurrency`
 ``CELERYD_CONSUMER``                   :setting:`worker_consumer`
 ``CELERY_WORKER_DIRECT``               :setting:`worker_direct`
@@ -2196,6 +2197,17 @@ Default: Disabled by default.
 If enabled the worker pool can be restarted using the
 :control:`pool_restart` remote control command.
 
+.. setting:: worker_autoscaler
+
+``worker_autoscaler``
+~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 2.2
+
+Default: ``"celery.worker.autoscale:Autoscaler"``.
+
+Name of the autoscaler class to use.
+
 .. setting:: worker_consumer
 
 ``worker_consumer``

+ 33 - 1
docs/userguide/extending.rst

@@ -204,6 +204,38 @@ Attributes
         class WorkerStep(bootsteps.StartStopStep):
             requires = {'celery.worker.components:Statedb'}
 
+.. _extending-worker-autoscaler:
+
+.. attribute:: autoscaler
+
+    :class:`~celery.worker.autoscaler.Autoscaler` used to automatically grow
+    and shrink the number of processes in the pool.
+
+    This is only defined if the ``autoscale`` argument is enabled.
+
+    Your worker bootstep must require the `Autoscaler` bootstep to use this:
+
+    .. code-block:: python
+
+        class WorkerStep(bootsteps.StartStopStep):
+            requires = ('celery.worker.autoscaler:Autoscaler',)
+
+.. _extending-worker-autoreloader:
+
+.. attribute:: autoreloader
+
+    :class:`~celery.worker.autoreloder.Autoreloader` used to automatically
+    reload use code when the file-system changes.
+
+    This is only defined if the ``autoreload`` argument is enabled.
+    Your worker bootstep must require the `Autoreloader` bootstep to use this;
+
+    .. code-block:: python
+
+        class WorkerStep(bootsteps.StartStopStep):
+            requires = ('celery.worker.autoreloader:Autoreloader',)
+>>>>>>> parent of 8e0ab0f... Removes autoscale option completely
+
 Example worker bootstep
 -----------------------
 
@@ -637,7 +669,7 @@ will show us more information about the boot process:
     [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
     <celery.apps.worker.Worker object at 0x101ad8410> is in init
     [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
-        {Hub, Pool, Timer, StateDB, InfoStep, Beat, Consumer}
+        {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
     [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
     [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
     <celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init

+ 2 - 1
docs/userguide/monitoring.rst

@@ -230,7 +230,7 @@ Features
 
     - View worker status and statistics
     - Shutdown and restart worker instances
-    - Control worker pool size
+    - Control worker pool size and autoscale settings
     - View and modify the queues a worker instance consumes from
     - View currently running tasks
     - View scheduled tasks (ETA/countdown)
@@ -246,6 +246,7 @@ Features
     - Restart worker’s pool
     - Grow worker’s pool
     - Shrink worker’s pool
+    - Autoscale worker pool
     - Start consuming from a queue
     - Stop consuming from a queue
     - List tasks

+ 33 - 2
docs/userguide/workers.rst

@@ -189,8 +189,8 @@ filename depending on the process that'll eventually need to open the file.
 This can be used to specify one log file per child process.
 
 Note that the numbers will stay within the process limit even if processes
-exit or if ``max-tasks-per-child``/time limits are used. That is, the number
-is the *process index*, not the process count or pid.
+exit or if autoscale/``maxtasksperchild``/time limits are used.  That is, the number
+is the *process index* not the process count or pid.
 
 * ``%i`` - Pool process index or 0 if MainProcess.
 
@@ -565,6 +565,37 @@ The option can be set using the workers
 :option:`--max-memory-per-child <celery worker --max-memory-per-child>` argument
 or using the :setting:`worker_max_memory_per_child` setting.
 
+.. _worker-autoscaling:
+
+Autoscaling
+===========
+
+.. versionadded:: 2.2
+
+:pool support: *prefork*, *gevent*
+
+The *autoscaler* component is used to dynamically resize the pool
+based on load:
+
+- The autoscaler adds more pool processes when there is work to do,
+    - and starts removing processes when the workload is low.
+
+It's enabled by the :option:`--autoscale <celery worker --autoscale>` option,
+which needs two numbers: the maximum and minimum number of pool processes:
+
+.. code-block:: text
+
+        --autoscale=AUTOSCALE
+             Enable autoscaling by providing
+             max_concurrency,min_concurrency.  Example:
+               --autoscale=10,3 (always keep 3 processes, but grow to
+              10 if necessary).
+
+You can also define your own rules for the autoscaler by subclassing
+:class:`~celery.worker.autoscaler.Autoscaler`.
+Some ideas for metrics include load average or the amount of memory available.
+You can specify a custom autoscaler with the :setting:`worker_autoscaler` setting.
+
 .. _worker-queues:
 
 Queues

+ 2 - 2
extra/bash-completion/celery.bash

@@ -75,7 +75,7 @@ _celery()
         COMPREPLY=( $(compgen -W '--concurrency= --pool= --purge --logfile=
         --loglevel= --hostname= --beat --schedule= --scheduler= --statedb= --events
         --time-limit= --soft-time-limit= --max-tasks-per-child= --queues=
-        --include= --pidfile= $fargs' -- ${cur} ) )
+        --include= --pidfile= --autoscale $fargs' -- ${cur} ) )
         return 0
         ;;
     inspect)
@@ -84,7 +84,7 @@ _celery()
         return 0
         ;;
     control)
-        COMPREPLY=( $(compgen -W 'add_consumer cancel_consumer
+        COMPREPLY=( $(compgen -W 'add_consumer autoscale cancel_consumer
         disable_events enable_events pool_grow pool_shrink
         rate_limit time_limit --help $controlargs $fargs' -- ${cur}) )
         return 0

+ 2 - 0
extra/zsh-completion/celery.zsh

@@ -55,6 +55,7 @@ case "$words[1]" in
     '(-Q --queues=)'{-Q,--queues=}'[List of queues to enable for this worker, separated by comma. By default all configured queues are enabled.]' \
     '(-I --include=)'{-I,--include=}'[Comma separated list of additional modules to import.]' \
     '(--pidfile=)--pidfile=[Optional file used to store the process pid.]' \
+    '(--autoscale=)--autoscale=[Enable autoscaling by providing max_concurrency, min_concurrency.]' \
     compadd -a ifargs
     ;;
     inspect)
@@ -73,6 +74,7 @@ case "$words[1]" in
     control)
     _values -s \
     'add_consumer[tell worker(s) to start consuming a queue]' \
+    'autoscale[change autoscale settings]' \
     'cancel_consumer[tell worker(s) to stop consuming a queue]' \
     'disable_events[tell worker(s) to disable events]' \
     'enable_events[tell worker(s) to enable events]' \

+ 3 - 5
t/unit/app/test_app.py

@@ -298,7 +298,7 @@ class test_App:
                 task_default_delivery_mode=63,
                 worker_agent='foo:Barz',
                 CELERYD_CONSUMER='foo:Fooz',
-                CELERYD_POOL='foo:Xuzzy',
+                CELERYD_AUTOSCALER='foo:Xuzzy',
             )
             with pytest.raises(ImproperlyConfigured):
                 assert app.conf.worker_consumer == 'foo:Fooz'
@@ -311,11 +311,9 @@ class test_App:
                 worker_agent='foo:Barz',
                 CELERYD_CONSUMER='foo:Fooz',
                 worker_consumer='foo:Fooz',
-                CELERYD_POOL='foo:Xuzzy',
-                worker_pool='foo:Xuzzy'
+                CELERYD_AUTOSCALER='foo:Xuzzy',
+                worker_autoscaler='foo:Xuzzy'
             )
-            assert app.conf.task_always_eager == 4
-            assert app.conf.worker_pool == 'foo:Xuzzy'
 
     def test_pending_configuration__setdefault(self):
         with self.Celery(broker='foo://bar') as app:

+ 9 - 0
t/unit/bin/test_worker.py

@@ -146,6 +146,8 @@ class test_Worker:
             assert worker.startup_info()
             worker.loglevel = logging.INFO
             assert worker.startup_info()
+            worker.autoscale = 13, 10
+            assert worker.startup_info()
 
             prev_loader = self.app.loader
             worker = self.Worker(
@@ -222,6 +224,13 @@ class test_Worker:
                 routing_key='image',
             )
 
+    def test_autoscale_argument(self):
+        with mock.stdouts():
+            worker1 = self.Worker(app=self.app, autoscale='10,3')
+            assert worker1.autoscale == [10, 3]
+            worker2 = self.Worker(app=self.app, autoscale='10')
+            assert worker2.autoscale == [10, 0]
+
     def test_include_argument(self):
         worker1 = self.Worker(app=self.app, include='os')
         assert worker1.include == ['os']

+ 215 - 0
t/unit/worker/test_autoscale.py

@@ -0,0 +1,215 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+import sys
+
+from case import Mock, mock, patch
+
+from celery.concurrency.base import BasePool
+from celery.five import monotonic
+from celery.worker import state
+from celery.worker import autoscale
+from celery.utils.objects import Bunch
+
+
+class MockPool(BasePool):
+
+    shrink_raises_exception = False
+    shrink_raises_ValueError = False
+
+    def __init__(self, *args, **kwargs):
+        super(MockPool, self).__init__(*args, **kwargs)
+        self._pool = Bunch(_processes=self.limit)
+
+    def grow(self, n=1):
+        self._pool._processes += n
+
+    def shrink(self, n=1):
+        if self.shrink_raises_exception:
+            raise KeyError('foo')
+        if self.shrink_raises_ValueError:
+            raise ValueError('foo')
+        self._pool._processes -= n
+
+    @property
+    def num_processes(self):
+        return self._pool._processes
+
+
+class test_WorkerComponent:
+
+    def test_register_with_event_loop(self):
+        parent = Mock(name='parent')
+        parent.autoscale = True
+        parent.consumer.on_task_message = set()
+        w = autoscale.WorkerComponent(parent)
+        assert parent.autoscaler is None
+        assert w.enabled
+
+        hub = Mock(name='hub')
+        w.create(parent)
+        w.register_with_event_loop(parent, hub)
+        assert (parent.autoscaler.maybe_scale in
+                parent.consumer.on_task_message)
+        hub.call_repeatedly.assert_called_with(
+            parent.autoscaler.keepalive, parent.autoscaler.maybe_scale,
+        )
+
+        parent.hub = hub
+        hub.on_init = []
+        w.instantiate = Mock()
+        w.register_with_event_loop(parent, Mock(name='loop'))
+        assert parent.consumer.on_task_message
+
+
+class test_Autoscaler:
+
+    def setup(self):
+        self.pool = MockPool(3)
+
+    def test_stop(self):
+
+        class Scaler(autoscale.Autoscaler):
+            alive = True
+            joined = False
+
+            def is_alive(self):
+                return self.alive
+
+            def join(self, timeout=None):
+                self.joined = True
+
+        worker = Mock(name='worker')
+        x = Scaler(self.pool, 10, 3, worker=worker)
+        x._is_stopped.set()
+        x.stop()
+        assert x.joined
+        x.joined = False
+        x.alive = False
+        x.stop()
+        assert not x.joined
+
+    @mock.sleepdeprived(module=autoscale)
+    def test_body(self):
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
+        x.body()
+        assert x.pool.num_processes == 3
+        _keep = [Mock(name='req{0}'.format(i)) for i in range(20)]
+        [state.task_reserved(m) for m in _keep]
+        x.body()
+        x.body()
+        assert x.pool.num_processes == 10
+        worker.consumer._update_prefetch_count.assert_called()
+        state.reserved_requests.clear()
+        x.body()
+        assert x.pool.num_processes == 10
+        x._last_scale_up = monotonic() - 10000
+        x.body()
+        assert x.pool.num_processes == 3
+        worker.consumer._update_prefetch_count.assert_called()
+
+    def test_run(self):
+
+        class Scaler(autoscale.Autoscaler):
+            scale_called = False
+
+            def body(self):
+                self.scale_called = True
+                self._is_shutdown.set()
+
+        worker = Mock(name='worker')
+        x = Scaler(self.pool, 10, 3, worker=worker)
+        x.run()
+        assert x._is_shutdown.isSet()
+        assert x._is_stopped.isSet()
+        assert x.scale_called
+
+    def test_shrink_raises_exception(self):
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
+        x.scale_up(3)
+        x.pool.shrink_raises_exception = True
+        x._shrink(1)
+
+    @patch('celery.worker.autoscale.debug')
+    def test_shrink_raises_ValueError(self, debug):
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
+        x.scale_up(3)
+        x._last_scale_up = monotonic() - 10000
+        x.pool.shrink_raises_ValueError = True
+        x.scale_down(1)
+        assert debug.call_count
+
+    def test_update_and_force(self):
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
+        assert x.processes == 3
+        x.force_scale_up(5)
+        assert x.processes == 8
+        x.update(5, None)
+        assert x.processes == 5
+        x.force_scale_down(3)
+        assert x.processes == 2
+        x.update(None, 3)
+        assert x.processes == 3
+        x.force_scale_down(1000)
+        assert x.min_concurrency == 0
+        assert x.processes == 0
+        x.force_scale_up(1000)
+        x.min_concurrency = 1
+        x.force_scale_down(1)
+
+        x.update(max=300, min=10)
+        x.update(max=300, min=2)
+        x.update(max=None, min=None)
+
+    def test_info(self):
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
+        info = x.info()
+        assert info['max'] == 10
+        assert info['min'] == 3
+        assert info['current'] == 3
+
+    @patch('os._exit')
+    def test_thread_crash(self, _exit):
+
+        class _Autoscaler(autoscale.Autoscaler):
+
+            def body(self):
+                self._is_shutdown.set()
+                raise OSError('foo')
+        worker = Mock(name='worker')
+        x = _Autoscaler(self.pool, 10, 3, worker=worker)
+
+        stderr = Mock()
+        p, sys.stderr = sys.stderr, stderr
+        try:
+            x.run()
+        finally:
+            sys.stderr = p
+        _exit.assert_called_with(1)
+        stderr.write.assert_called()
+
+    @mock.sleepdeprived(module=autoscale)
+    def test_no_negative_scale(self):
+        total_num_processes = []
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
+        x.body()  # the body func scales up or down
+
+        _keep = [Mock(name='req{0}'.format(i)) for i in range(35)]
+        for req in _keep:
+            state.task_reserved(req)
+            x.body()
+            total_num_processes.append(self.pool.num_processes)
+
+        for req in _keep:
+            state.task_ready(req)
+            x.body()
+            total_num_processes.append(self.pool.num_processes)
+
+        assert all(x.min_concurrency <= i <= x.max_concurrency
+                   for i in total_num_processes)

+ 21 - 0
t/unit/worker/test_control.py

@@ -26,6 +26,7 @@ hostname = socket.gethostname()
 
 
 class WorkController(object):
+    autoscaler = None
 
     def stats(self):
         return {'total': worker_state.total_count}
@@ -334,6 +335,11 @@ class test_ControlPanel:
 
         panel.state.consumer = Mock()
         panel.state.consumer.controller = Mock()
+        sc = panel.state.consumer.controller.autoscaler = Mock()
+        panel.handle('pool_grow')
+        sc.force_scale_up.assert_called()
+        panel.handle('pool_shrink')
+        sc.force_scale_down.assert_called()
 
     def test_add__cancel_consumer(self):
 
@@ -511,6 +517,21 @@ class test_ControlPanel:
         finally:
             worker_state.task_ready(request)
 
+    def test_autoscale(self):
+        self.panel.state.consumer = Mock()
+        self.panel.state.consumer.controller = Mock()
+        sc = self.panel.state.consumer.controller.autoscaler = Mock()
+        sc.update.return_value = 10, 2
+        m = {'method': 'autoscale',
+             'destination': hostname,
+             'arguments': {'max': '10', 'min': '2'}}
+        r = self.panel.handle_message(m, None)
+        assert 'ok' in r
+
+        self.panel.state.consumer.controller.autoscaler = None
+        r = self.panel.handle_message(m, None)
+        assert 'error' in r
+
     def test_ping(self):
         m = {'method': 'ping',
              'destination': hostname}

+ 7 - 0
t/unit/worker/test_worker.py

@@ -781,6 +781,13 @@ class test_WorkController(ConsumerCase):
         assert worker.beat
         assert worker.beat in [w.obj for w in worker.steps]
 
+    def test_with_autoscaler(self):
+        worker = self.create_worker(
+            autoscale=[10, 3], send_events=False,
+            timer_cls='celery.utils.timer2.Timer',
+        )
+        assert worker.autoscaler
+
     def test_dont_stop_or_terminate(self):
         worker = self.app.WorkController(concurrency=1, loglevel=0)
         worker.stop()