Browse Source

Add options for exponential backoff with task autoretry (#4101)

* Add options for exponential backoff with task autoretry

* Add test for exponential backoff

* closer to a fixed test

* Move autoretry backoff functionality inside run wrapper

* Add a test for jitter

* Correct for semantics of `random.randrange()`

`random.randrange()` treats the argument it receives as just *outside* the
bound of possible return values. For example, if you call
`random.randrange(2)`, you might get 0 or 1, but you'll never get 2.
Since we want to allow the `retry_jitter` parameter to occasionally apply no
jitter at all, we need to add one to the value we pass to `randrange()`,
so that there's a chance that we receive that original value back.

* Put side_effect on patch lines

* Fix flake8

* Add celery.utils.time.get_exponential_backoff_interval

* Use exponential backoff calculation from utils in task

* Update docs around retry_jitter

* Remove unnecessary random.choice patching

* Update task auto-retry documentation

* PEP8: remove unused import

* PEP8: remove trailing whitespace

* PEP8: Fix E123 warning
David Baumgold 7 years ago
parent
commit
0d5b840af1
5 changed files with 190 additions and 2 deletions
  1. 11 1
      celery/app/base.py
  2. 19 0
      celery/utils/time.py
  3. 64 0
      docs/userguide/tasks.rst
  4. 58 0
      t/unit/tasks/test_tasks.py
  5. 38 1
      t/unit/utils/test_time.py

+ 11 - 1
celery/app/base.py

@@ -36,7 +36,7 @@ from celery.utils import abstract
 from celery.utils.collections import AttributeDictMixin
 from celery.utils.dispatch import Signal
 from celery.utils.functional import first, maybe_list, head_from_fun
-from celery.utils.time import timezone
+from celery.utils.time import timezone, get_exponential_backoff_interval
 from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
 from celery.utils.log import get_logger
 from celery.utils.objects import FallbackContext, mro_lookup
@@ -463,6 +463,9 @@ class Celery(object):
 
             autoretry_for = tuple(options.get('autoretry_for', ()))
             retry_kwargs = options.get('retry_kwargs', {})
+            retry_backoff = int(options.get('retry_backoff', False))
+            retry_backoff_max = int(options.get('retry_backoff_max', 600))
+            retry_jitter = options.get('retry_jitter', True)
 
             if autoretry_for and not hasattr(task, '_orig_run'):
 
@@ -471,6 +474,13 @@ class Celery(object):
                     try:
                         return task._orig_run(*args, **kwargs)
                     except autoretry_for as exc:
+                        if retry_backoff:
+                            retry_kwargs['countdown'] = \
+                                get_exponential_backoff_interval(
+                                    factor=retry_backoff,
+                                    retries=task.request.retries,
+                                    maximum=retry_backoff_max,
+                                    full_jitter=retry_jitter)
                         raise task.retry(exc=exc, **retry_kwargs)
 
                 task._orig_run, task.run = task.run, run

+ 19 - 0
celery/utils/time.py

@@ -4,6 +4,7 @@ from __future__ import absolute_import, print_function, unicode_literals
 
 import numbers
 import os
+import random
 import sys
 import time as _time
 
@@ -27,6 +28,7 @@ __all__ = [
     'humanize_seconds', 'maybe_iso8601', 'is_naive',
     'make_aware', 'localize', 'to_utc', 'maybe_make_aware',
     'ffwd', 'utcoffset', 'adjust_timestamp',
+    'get_exponential_backoff_interval',
 ]
 
 PY3 = sys.version_info[0] == 3
@@ -383,3 +385,20 @@ def utcoffset(time=_time, localtime=_time.localtime):
 def adjust_timestamp(ts, offset, here=utcoffset):
     """Adjust timestamp based on provided utcoffset."""
     return ts - (offset - here()) * 3600
+
+
+def get_exponential_backoff_interval(
+    factor,
+    retries,
+    maximum,
+    full_jitter=False
+):
+    """Calculate the exponential backoff wait time."""
+    # Will be zero if factor equals 0
+    countdown = factor * (2 ** retries)
+    # Full jitter according to
+    # https://www.awsarchitectureblog.com/2015/03/backoff.html
+    if full_jitter:
+        countdown = random.randrange(countdown + 1)
+    # Adjust according to maximum wait time and account for negative values.
+    return max(0, min(maximum, countdown))

+ 64 - 0
docs/userguide/tasks.rst

@@ -746,6 +746,68 @@ If you want to automatically retry on any error, simply use:
     def x():
         ...
 
+.. versionadded:: 4.1
+
+If your tasks depend on another service, like making a request to an API,
+then it's a good idea to use `exponential backoff`_ to avoid overwhelming the
+service with your requests. Fortunately, Celery's automatic retry support
+makes it easy. Just specify the :attr:`~Task.retry_backoff` argument, like this:
+
+.. code-block:: python
+
+    from requests.exceptions import RequestException
+
+    @app.task(autoretry_for=(RequestException,), retry_backoff=True)
+    def x():
+        ...
+
+By default, this exponential backoff will also introduce random jitter_ to
+avoid having all the tasks run at the same moment. It will also cap the
+maximum backoff delay to 10 minutes. All these settings can be customized
+via options documented below.
+
+.. attribute:: Task.autoretry_for
+
+    A list/tuple of exception classes. If any of these exceptions are raised
+    during the execution of the task, the task will automatically be retried.
+    By default, no exceptions will be autoretried.
+
+.. attribute:: Task.retry_kwargs
+
+    A dictionary. Use this to customize how autoretries are executed.
+    Note that if you use the exponential backoff options below, the `countdown`
+    task option will be determined by Celery's autoretry system, and any
+    `countdown` included in this dictionary will be ignored.
+
+.. attribute:: Task.retry_backoff
+
+    A boolean, or a number. If this option is set to ``True``, autoretries
+    will be delayed following the rules of `exponential backoff`_. The first
+    retry will have a delay of 1 second, the second retry will have a delay
+    of 2 seconds, the third will delay 4 seconds, the fourth will delay 8
+    seconds, and so on. (However, this delay value is modified by
+    :attr:`~Task.retry_jitter`, if it is enabled.)
+    If this option is set to a number, it is used as a
+    delay factor. For example, if this option is set to ``3``, the first retry
+    will delay 3 seconds, the second will delay 6 seconds, the third will
+    delay 12 seconds, the fourth will delay 24 seconds, and so on. By default,
+    this option is set to ``False``, and autoretries will not be delayed.
+
+.. attribute:: Task.retry_backoff_max
+
+    A number. If ``retry_backoff`` is enabled, this option will set a maximum
+    delay in seconds between task autoretries. By default, this option is set to ``600``,
+    which is 10 minutes.
+
+.. attribute:: Task.retry_jitter
+
+    A boolean. `Jitter`_ is used to introduce randomness into
+    exponential backoff delays, to prevent all tasks in the queue from being
+    executed simultaneously. If this option is set to ``True``, the delay
+    value calculated by :attr:`~Task.retry_backoff` is treated as a maximum,
+    and the actual delay value will be a random number between zero and that
+    maximum. By default, this option is set to ``True``.
+
 .. _task-options:
 
 List of Options
@@ -1899,3 +1961,5 @@ To make API calls to `Akismet`_ I use the `akismet.py`_ library written by
 .. _`Akismet`: http://akismet.com/faq/
 .. _`akismet.py`: http://www.voidspace.org.uk/downloads/akismet.py
 .. _`Michael Foord`: http://www.voidspace.org.uk/
+.. _`exponential backoff`: https://en.wikipedia.org/wiki/Exponential_backoff
+.. _`jitter`: https://en.wikipedia.org/wiki/Jitter

+ 58 - 0
t/unit/tasks/test_tasks.py

@@ -2,9 +2,15 @@ from __future__ import absolute_import, unicode_literals
 
 import pytest
 import socket
+import tempfile
 
 from datetime import datetime, timedelta
 
+try:
+    from urllib.error import HTTPError
+except ImportError:  # pragma: no cover
+    from urllib2 import HTTPError
+
 from case import ContextMock, MagicMock, Mock, patch
 from kombu import Queue
 
@@ -120,6 +126,28 @@ class TasksCase:
 
         self.autoretry_task = autoretry_task
 
+        @self.app.task(bind=True, autoretry_for=(HTTPError,),
+                       retry_backoff=True, shared=False)
+        def autoretry_backoff_task(self, url):
+            self.iterations += 1
+            if "error" in url:
+                fp = tempfile.TemporaryFile()
+                raise HTTPError(url, '500', 'Error', '', fp)
+            return url
+
+        self.autoretry_backoff_task = autoretry_backoff_task
+
+        @self.app.task(bind=True, autoretry_for=(HTTPError,),
+                       retry_backoff=True, retry_jitter=True, shared=False)
+        def autoretry_backoff_jitter_task(self, url):
+            self.iterations += 1
+            if "error" in url:
+                fp = tempfile.TemporaryFile()
+                raise HTTPError(url, '500', 'Error', '', fp)
+            return url
+
+        self.autoretry_backoff_jitter_task = autoretry_backoff_jitter_task
+
         @self.app.task(bind=True)
         def task_check_request_context(self):
             assert self.request.hostname == socket.gethostname()
@@ -251,6 +279,36 @@ class test_task_retries(TasksCase):
         self.autoretry_task.apply((1, 0))
         assert self.autoretry_task.iterations == 6
 
+    @patch('random.randrange', side_effect=lambda i: i - 1)
+    def test_autoretry_backoff(self, randrange):
+        task = self.autoretry_backoff_task
+        task.max_retries = 3
+        task.iterations = 0
+
+        with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
+            task.apply(("http://httpbin.org/error",))
+
+        assert task.iterations == 4
+        retry_call_countdowns = [
+            call[1]['countdown'] for call in fake_retry.call_args_list
+        ]
+        assert retry_call_countdowns == [1, 2, 4, 8]
+
+    @patch('random.randrange', side_effect=lambda i: i - 2)
+    def test_autoretry_backoff_jitter(self, randrange):
+        task = self.autoretry_backoff_jitter_task
+        task.max_retries = 3
+        task.iterations = 0
+
+        with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
+            task.apply(("http://httpbin.org/error",))
+
+        assert task.iterations == 4
+        retry_call_countdowns = [
+            call[1]['countdown'] for call in fake_retry.call_args_list
+        ]
+        assert retry_call_countdowns == [0, 1, 3, 7]
+
     def test_retry_wrong_eta_when_not_enable_utc(self):
         """Issue #3753"""
         self.app.conf.enable_utc = False

+ 38 - 1
t/unit/utils/test_time.py

@@ -3,7 +3,7 @@ import pytest
 import pytz
 from datetime import datetime, timedelta, tzinfo
 from pytz import AmbiguousTimeError
-from case import Mock
+from case import Mock, patch
 from celery.utils.time import (
     delta_resolution,
     humanize_seconds,
@@ -18,6 +18,7 @@ from celery.utils.time import (
     LocalTimezone,
     ffwd,
     utcoffset,
+    get_exponential_backoff_interval,
 )
 from celery.utils.iso8601 import parse_iso8601
 
@@ -253,3 +254,39 @@ class test_utcoffset:
         assert utcoffset(time=_time) is not None
         _time.daylight = False
         assert utcoffset(time=_time) is not None
+
+
+class test_get_exponential_backoff_interval:
+
+    @patch('random.randrange', lambda n: n - 2)
+    def test_with_jitter(self):
+        assert get_exponential_backoff_interval(
+            factor=4,
+            retries=3,
+            maximum=100,
+            full_jitter=True
+        ) == 4 * (2 ** 3) - 1
+
+    def test_without_jitter(self):
+        assert get_exponential_backoff_interval(
+            factor=4,
+            retries=3,
+            maximum=100,
+            full_jitter=False
+        ) == 4 * (2 ** 3)
+
+    def test_bound_by_maximum(self):
+        maximum_boundary = 100
+        assert get_exponential_backoff_interval(
+            factor=40,
+            retries=3,
+            maximum=maximum_boundary
+        ) == maximum_boundary
+
+    @patch('random.randrange', lambda n: n - 1)
+    def test_negative_values(self):
+        assert get_exponential_backoff_interval(
+            factor=-40,
+            retries=3,
+            maximum=100
+        ) == 0