ソースを参照

Adding redis sentinel backend (#4144)

* Adding redis sentinel backend

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Addressed flake8 issues

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Updating pydoc style to match requirements

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Fixing unit tests on Appveyor by setting up a fake sentinel module inside the SentinelBackend class

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Redefining the fake Sentinel class to use correct parameters when calling 'master_for'

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Updated configuration name, 'result_backend_transpoirt_options'

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Added documentation for the feature

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Make sure the database/password parameters are forwarded to the StrictRedis class while instantiating a Sentinel

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Database/password parameters should be sent as connection_kwargs to the Sentinel class, to be used when connection to redis is created

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Fixing flake8 issues

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Addressing configcheck issues

Signed-off-by: Geoffrey Bauduin <geoffrey.bauduin@corp.ovh.com>

* Add docstring to SentinelBackend.

* Fix flake8 error.
Geoffrey Bauduin 7 年 前
コミット
7d52b0bddf

+ 1 - 0
celery/app/backends.py

@@ -19,6 +19,7 @@ BACKEND_ALIASES = {
     'rpc': 'celery.backends.rpc.RPCBackend',
     'cache': 'celery.backends.cache:CacheBackend',
     'redis': 'celery.backends.redis:RedisBackend',
+    'sentinel': 'celery.backends.redis:SentinelBackend',
     'mongodb': 'celery.backends.mongodb:MongoBackend',
     'db': 'celery.backends.database:DatabaseBackend',
     'database': 'celery.backends.database:DatabaseBackend',

+ 1 - 0
celery/app/defaults.py

@@ -179,6 +179,7 @@ NAMESPACES = Namespace(
         ),
         persistent=Option(None, type='bool'),
         serializer=Option('json'),
+        backend_transport_options=Option({}, type='dict'),
     ),
     elasticsearch=Namespace(
         __old__=old_ns('celery_elasticsearch'),

+ 91 - 16
celery/backends/redis.py

@@ -24,24 +24,33 @@ from . import base
 try:
     import redis
     from kombu.transport.redis import get_redis_error_classes
-except ImportError:                 # pragma: no cover
-    redis = None                    # noqa
+except ImportError:  # pragma: no cover
+    redis = None  # noqa
     get_redis_error_classes = None  # noqa
 
-__all__ = ('RedisBackend',)
+try:
+    from redis import sentinel
+except ImportError:
+    sentinel = None
+
+__all__ = ('RedisBackend', 'SentinelBackend')
 
 E_REDIS_MISSING = """
 You need to install the redis library in order to use \
 the Redis result store backend.
 """
 
+E_REDIS_SENTINEL_MISSING = """
+You need to install the redis library with support of \
+sentinel in order to use the Redis result store backend.
+"""
+
 E_LOST = 'Connection to Redis lost: Retry (%s/%s) %s.'
 
 logger = get_logger(__name__)
 
 
 class ResultConsumer(async.BaseResultConsumer):
-
     _pubsub = None
 
     def __init__(self, *args, **kwargs):
@@ -265,12 +274,12 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
         tkey = self.get_key_for_group(gid, '.t')
         result = self.encode_result(result, state)
         with client.pipeline() as pipe:
-            _, readycount, totaldiff, _, _ = pipe                           \
-                .rpush(jkey, self.encode([1, tid, state, result]))          \
-                .llen(jkey)                                                 \
-                .get(tkey)                                                  \
-                .expire(jkey, self.expires)                                 \
-                .expire(tkey, self.expires)                                 \
+            _, readycount, totaldiff, _, _ = pipe \
+                .rpush(jkey, self.encode([1, tid, state, result])) \
+                .llen(jkey) \
+                .get(tkey) \
+                .expire(jkey, self.expires) \
+                .expire(tkey, self.expires) \
                 .execute()
 
         totaldiff = int(totaldiff or 0)
@@ -281,10 +290,10 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
             if readycount == total:
                 decode, unpack = self.decode, self._unpack_chord_result
                 with client.pipeline() as pipe:
-                    resl, _, _ = pipe               \
-                        .lrange(jkey, 0, total)     \
-                        .delete(jkey)               \
-                        .delete(tkey)               \
+                    resl, _, _ = pipe \
+                        .lrange(jkey, 0, total) \
+                        .delete(jkey) \
+                        .delete(tkey) \
                         .execute()
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
@@ -306,10 +315,16 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
             )
 
     def _create_client(self, **params):
-        return self.redis.StrictRedis(
-            connection_pool=self.ConnectionPool(**params),
+        return self._get_client()(
+            connection_pool=self._get_pool(**params),
         )
 
+    def _get_client(self):
+        return self.redis.StrictRedis
+
+    def _get_pool(self, **params):
+        return self.ConnectionPool(**params)
+
     @property
     def ConnectionPool(self):
         if self._ConnectionPool is None:
@@ -340,3 +355,63 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
     @deprecated.Property(4.0, 5.0)
     def password(self):
         return self.connparams['password']
+
+
+class SentinelBackend(RedisBackend):
+    """Redis sentinel task result store."""
+
+    sentinel = sentinel
+
+    def __init__(self, *args, **kwargs):
+        if self.sentinel is None:
+            raise ImproperlyConfigured(E_REDIS_SENTINEL_MISSING.strip())
+
+        super(SentinelBackend, self).__init__(*args, **kwargs)
+
+    def _params_from_url(self, url, defaults):
+        # URL looks like sentinel://0.0.0.0:26347/3;sentinel://0.0.0.0:26348/3.
+        chunks = url.split(";")
+        connparams = dict(defaults, hosts=[])
+        for chunk in chunks:
+            data = super(SentinelBackend, self)._params_from_url(
+                url=chunk, defaults=defaults)
+            connparams['hosts'].append(data)
+        for p in ("host", "port", "db", "password"):
+            connparams.pop(p)
+
+        # Adding db/password in connparams to connect to the correct instance
+        for p in ("db", "password"):
+            if connparams['hosts'] and p in connparams['hosts'][0]:
+                connparams[p] = connparams['hosts'][0].get(p)
+        return connparams
+
+    def _get_sentinel_instance(self, **params):
+        connparams = params.copy()
+
+        hosts = connparams.pop("hosts")
+        result_backend_transport_opts = self.app.conf.get(
+            "result_backend_transport_options", {})
+        min_other_sentinels = result_backend_transport_opts.get(
+            "min_other_sentinels", 0)
+        sentinel_kwargs = result_backend_transport_opts.get(
+            "sentinel_kwargs", {})
+
+        sentinel_instance = self.sentinel.Sentinel(
+            [(cp['host'], cp['port']) for cp in hosts],
+            min_other_sentinels=min_other_sentinels,
+            sentinel_kwargs=sentinel_kwargs,
+            **connparams)
+
+        return sentinel_instance
+
+    def _get_pool(self, **params):
+        sentinel_instance = self._get_sentinel_instance(**params)
+
+        result_backend_transport_opts = self.app.conf.get(
+            "result_backend_transport_options", {})
+        master_name = result_backend_transport_opts.get("master_name", None)
+
+        return sentinel_instance.master_for(
+            service_name=master_name,
+            redis_class=self._get_client(),
+        ).connection_pool

+ 7 - 2
docs/getting-started/brokers/redis.rst

@@ -86,9 +86,14 @@ you should configure these settings::
     app.conf.result_backend = 'redis://localhost:6379/0'
 
 For a complete list of options supported by the Redis result backend, see
-:ref:`conf-redis-result-backend`
+:ref:`conf-redis-result-backend`.
+
+If you are using Sentinel, you should specify the master_name using the :setting:`result_backend_transport_options` setting:
+
+.. code-block:: python
+
+    app.conf.result_backend_transport_options = {'master_name': "mymaster"}
 
-*Note that sentinel is not a supported backend*
 
 .. _redis-caveats:
 

+ 21 - 0
docs/userguide/configuration.rst

@@ -593,6 +593,27 @@ Can be one of the following:
 .. _`Couchbase`: https://www.couchbase.com/
 .. _`Consul`: https://consul.io/
 
+
+.. setting:: result_backend_transport_options
+
+``result_backend_transport_options``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default: ``{}`` (empty mapping).
+
+A dict of additional options passed to the underlying transport.
+
+See your transport user manual for supported options (if any).
+
+Example setting the visibility timeout (supported by Redis and SQS
+transports):
+
+.. code-block:: python
+
+    result_backend_transport_options = {'visibility_timeout': 18000}  # 5 hours
+
+
+
 .. setting:: result_serializer
 
 ``result_serializer``

+ 103 - 6
t/unit/backends/test_redis.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import, unicode_literals
 import pytest
 import ssl
+import random
 from datetime import timedelta
 from contextlib import contextmanager
 from pickle import loads, dumps
@@ -16,10 +17,10 @@ from celery.utils.collections import AttributeDict
 
 
 def raise_on_second_call(mock, exc, *retval):
-
     def on_first_call(*args, **kwargs):
         mock.side_effect = exc
         return mock.return_value
+
     mock.side_effect = on_first_call
     if retval:
         mock.return_value, = retval
@@ -33,16 +34,15 @@ class Connection(object):
 
 
 class Pipeline(object):
-
     def __init__(self, client):
         self.client = client
         self.steps = []
 
     def __getattr__(self, attr):
-
         def add_step(*args, **kwargs):
             self.steps.append((getattr(self.client, attr), args, kwargs))
             return self
+
         return add_step
 
     def __enter__(self):
@@ -105,22 +105,36 @@ class Redis(mock.MockCallbacks):
         return len(self.keyspace.get(key) or [])
 
 
+class Sentinel(mock.MockCallbacks):
+    def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
+                 **connection_kwargs):
+        self.sentinel_kwargs = sentinel_kwargs
+        self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
+                          for hostname, port in sentinels]
+        self.min_other_sentinels = min_other_sentinels
+        self.connection_kwargs = connection_kwargs
+
+    def master_for(self, service_name, redis_class):
+        return random.choice(self.sentinels)
+
+
 class redis(object):
     StrictRedis = Redis
 
     class ConnectionPool(object):
-
         def __init__(self, **kwargs):
             pass
 
     class UnixDomainSocketConnection(object):
-
         def __init__(self, **kwargs):
             pass
 
 
-class test_RedisBackend:
+class sentinel(object):
+    Sentinel = Sentinel
+
 
+class test_RedisBackend:
     def get_backend(self):
         from celery.backends.redis import RedisBackend
 
@@ -418,3 +432,86 @@ class test_RedisBackend:
         self.b.client.expire.assert_called_with(
             key, 512,
         )
+
+
+class test_SentinelBackend:
+    def get_backend(self):
+        from celery.backends.redis import SentinelBackend
+
+        class _SentinelBackend(SentinelBackend):
+            redis = redis
+            sentinel = sentinel
+
+        return _SentinelBackend
+
+    def get_E_LOST(self):
+        from celery.backends.redis import E_LOST
+        return E_LOST
+
+    def setup(self):
+        self.Backend = self.get_backend()
+        self.E_LOST = self.get_E_LOST()
+        self.b = self.Backend(app=self.app)
+
+    @pytest.mark.usefixtures('depends_on_current_app')
+    @skip.unless_module('redis')
+    def test_reduce(self):
+        from celery.backends.redis import SentinelBackend
+        x = SentinelBackend(app=self.app)
+        assert loads(dumps(x))
+
+    def test_no_redis(self):
+        self.Backend.redis = None
+        with pytest.raises(ImproperlyConfigured):
+            self.Backend(app=self.app)
+
+    def test_url(self):
+        self.app.conf.redis_socket_timeout = 30.0
+        self.app.conf.redis_socket_connect_timeout = 100.0
+        x = self.Backend(
+            'sentinel://:test@github.com:123/1;'
+            'sentinel://:test@github.com:124/1',
+            app=self.app,
+        )
+        assert x.connparams
+        assert "host" not in x.connparams
+        assert x.connparams['db'] == 1
+        assert "port" not in x.connparams
+        assert x.connparams['password'] == "test"
+        assert len(x.connparams['hosts']) == 2
+        expected_hosts = ["github.com", "github.com"]
+        found_hosts = [cp['host'] for cp in x.connparams['hosts']]
+        assert found_hosts == expected_hosts
+
+        expected_ports = [123, 124]
+        found_ports = [cp['port'] for cp in x.connparams['hosts']]
+        assert found_ports == expected_ports
+
+        expected_passwords = ["test", "test"]
+        found_passwords = [cp['password'] for cp in x.connparams['hosts']]
+        assert found_passwords == expected_passwords
+
+        expected_dbs = [1, 1]
+        found_dbs = [cp['db'] for cp in x.connparams['hosts']]
+        assert found_dbs == expected_dbs
+
+    def test_get_sentinel_instance(self):
+        x = self.Backend(
+            'sentinel://:test@github.com:123/1;'
+            'sentinel://:test@github.com:124/1',
+            app=self.app,
+        )
+        sentinel_instance = x._get_sentinel_instance(**x.connparams)
+        assert sentinel_instance.sentinel_kwargs == {}
+        assert sentinel_instance.connection_kwargs['db'] == 1
+        assert sentinel_instance.connection_kwargs['password'] == "test"
+        assert len(sentinel_instance.sentinels) == 2
+
+    def test_get_pool(self):
+        x = self.Backend(
+            'sentinel://:test@github.com:123/1;'
+            'sentinel://:test@github.com:124/1',
+            app=self.app,
+        )
+        pool = x._get_pool(**x.connparams)
+        assert pool