Przeglądaj źródła

Redis result backend: Adds support for a max_connection parameter.

This enables the possibility to configure the maximum number of
simultaneous connections used for results in the redis connection pool.

The default max connections setting can be configured using the
:setting:`CELERY_REDIS_MAX_CONNECTIONS` setting,
or it can be changed individually by ``RedisBackend(max_connections=int)``.

Closes #467
Steeve Morin 13 lat temu
rodzic
commit
6c1d16707c
1 zmienionych plików z 9 dodań i 5 usunięć
  1. 9 5
      celery/backends/redis.py

+ 9 - 5
celery/backends/redis.py

@@ -35,7 +35,7 @@ class RedisBackend(KeyValueStoreBackend):
     supports_native_join = True
     supports_native_join = True
 
 
     def __init__(self, host=None, port=None, db=None, password=None,
     def __init__(self, host=None, port=None, db=None, password=None,
-            expires=None, **kwargs):
+            expires=None, max_connections=None, **kwargs):
         super(RedisBackend, self).__init__(**kwargs)
         super(RedisBackend, self).__init__(**kwargs)
         conf = self.app.conf
         conf = self.app.conf
         if self.redis is None:
         if self.redis is None:
@@ -56,7 +56,8 @@ class RedisBackend(KeyValueStoreBackend):
         self.db = db or _get("DB") or self.db
         self.db = db or _get("DB") or self.db
         self.password = password or _get("PASSWORD") or self.password
         self.password = password or _get("PASSWORD") or self.password
         self.expires = self.prepare_expires(expires, type=int)
         self.expires = self.prepare_expires(expires, type=int)
-
+        self.max_connections = max_connections or _get("MAX_CONNECTIONS") \
+                               or self.max_connections
     def get(self, key):
     def get(self, key):
         return self.client.get(key)
         return self.client.get(key)
 
 
@@ -92,8 +93,10 @@ class RedisBackend(KeyValueStoreBackend):
 
 
     @cached_property
     @cached_property
     def client(self):
     def client(self):
-        return self.redis.Redis(host=self.host, port=self.port,
-                                db=self.db, password=self.password)
+        pool = self.redis.ConnectionPool(host=self.host, port=self.port,
+                                         db=self.db, password=self.password,
+                                         max_connections=self.max_connections)
+        return self.redis.Redis(connection_pool=pool)
 
 
     def __reduce__(self, args=(), kwargs={}):
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
         kwargs.update(
@@ -101,5 +104,6 @@ class RedisBackend(KeyValueStoreBackend):
                  port=self.port,
                  port=self.port,
                  db=self.db,
                  db=self.db,
                  password=self.password,
                  password=self.password,
-                 expires=self.expires))
+                 expires=self.expires,
+                 max_connections=self.max_connections))
         return super(RedisBackend, self).__reduce__(args, kwargs)
         return super(RedisBackend, self).__reduce__(args, kwargs)