소스 검색

Redis result backend: Now retries writing the result.

Ask Solem 11 년 전
부모
커밋
590cfc37c9
2개의 변경된 파일42개의 추가작업 그리고 5개의 파일을 삭제
  1. 7 0
      celery/backends/base.py
  2. 35 5
      celery/backends/redis.py

+ 7 - 0
celery/backends/base.py

@@ -73,6 +73,13 @@ class BaseBackend(object):
     #: Set to true if the backend is peristent by default.
     persistent = True
 
+    retry_policy = {
+        'max_retries': 20,
+        'interval_start': 0,
+        'interval_step': 1,
+        'interval_max': 1,
+    }
+
     def __init__(self, app, serializer=None,
                  max_cached_results=None, accept=None, **kwargs):
         self.app = app

+ 35 - 5
celery/backends/redis.py

@@ -8,22 +8,28 @@
 """
 from __future__ import absolute_import
 
-from kombu.utils import cached_property
+from functools import partial
+
+from kombu.utils import cached_property, retry_over_time
 from kombu.utils.url import _parse_url
 
 from celery.exceptions import ImproperlyConfigured
 from celery.five import string_t
 from celery.utils import deprecated_property
 from celery.utils.functional import dictfilter
+from celery.utils.log import get_logger
+from celery.utils.timeutils import humanize_seconds
 
 from .base import KeyValueStoreBackend
 
 try:
     import redis
     from redis.exceptions import ConnectionError
-except ImportError:         # pragma: no cover
-    redis = None            # noqa
-    ConnectionError = None  # noqa
+    from kombu.transport.redis import get_redis_error_classes
+except ImportError:                 # pragma: no cover
+    redis = None                    # noqa
+    ConnectionError = None          # noqa
+    get_redis_error_classes = None  # noqa
 
 __all__ = ['RedisBackend']
 
@@ -31,6 +37,9 @@ REDIS_MISSING = """\
 You need to install the redis library in order to use \
 the Redis result store backend."""
 
+logger = get_logger(__name__)
+error = logger.error
+
 
 class RedisBackend(KeyValueStoreBackend):
     """Redis task result store."""
@@ -79,6 +88,8 @@ class RedisBackend(KeyValueStoreBackend):
         self.url = url
         self.expires = self.prepare_expires(expires, type=int)
 
+        self.connection_errors, self.channel_errors = get_redis_error_classes()
+
     def _params_from_url(self, url, defaults):
         scheme, host, port, user, password, path, query = _parse_url(url)
         connparams = dict(
@@ -115,7 +126,26 @@ class RedisBackend(KeyValueStoreBackend):
     def mget(self, keys):
         return self.client.mget(keys)
 
-    def set(self, key, value):
+    def ensure(self, fun, args, **policy):
+        retry_policy = dict(self.retry_policy, **policy)
+        max_retries = retry_policy.get('max_retries')
+        return retry_over_time(
+            fun, self.connection_errors, args, {},
+            partial(self.on_connection_error, max_retries),
+            **retry_policy
+        )
+
+    def on_connection_error(self, max_retries, exc, intervals, retries):
+        tts = next(intervals)
+        error('Connection to Redis lost: Retry (%s/%s) %s.',
+              retries, max_retries or 'Inf',
+              humanize_seconds(tts, 'in '))
+        return tts
+
+    def set(self, key, value, **retry_policy):
+        return self.ensure(self._set, (key, value), **retry_policy)
+
+    def _set(self, key, value):
         client = self.client
         if self.expires:
             client.setex(key, value, self.expires)