Jelajahi Sumber

Merge branch 'reversefold/master'

Ask Solem 9 tahun lalu
induk
melakukan
88d9d2f75e
3 mengubah file dengan 32 tambahan dan 19 penghapusan
  1. 1 0
      CONTRIBUTORS.txt
  2. 25 19
      celery/backends/redis.py
  3. 6 0
      celery/tests/backends/test_redis.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -190,5 +190,6 @@ Alexander Lebedev, 2015/04/25
 Frantisek Holop, 2015/05/21
 Feanil Patel, 2015/05/21
 Jocelyn Delalande, 2015/06/03
+Justin Patrin, 2015/08/06
 Juan Rossi, 2015/08/10
 Piotr Maślanka, 2015/08/24

+ 25 - 19
celery/backends/redis.py

@@ -130,6 +130,10 @@ class RedisBackend(KeyValueStoreBackend):
         db = db.strip('/') if isinstance(db, string_t) else db
         connparams['db'] = int(db)
 
+        for key in ['socket_timeout', 'socket_connect_timeout']:
+            if key in query:
+                query[key] = float(query[key])
+
         # Query parameters override other parameters
         connparams.update(query)
         return connparams
@@ -160,13 +164,13 @@ class RedisBackend(KeyValueStoreBackend):
         return self.ensure(self._set, (key, value), **retry_policy)
 
     def _set(self, key, value):
-        pipe = self.client.pipeline()
-        if self.expires:
-            pipe.setex(key, value, self.expires)
-        else:
-            pipe.set(key, value)
-        pipe.publish(key, value)
-        pipe.execute()
+        with self.client.pipeline() as pipe:
+            if self.expires:
+                pipe.setex(key, value, self.expires)
+            else:
+                pipe.set(key, value)
+            pipe.publish(key, value)
+            pipe.execute()
 
     def delete(self, key):
         self.client.delete(key)
@@ -207,13 +211,14 @@ class RedisBackend(KeyValueStoreBackend):
         jkey = self.get_key_for_group(gid, '.j')
         tkey = self.get_key_for_group(gid, '.t')
         result = self.encode_result(result, state)
-        _, readycount, totaldiff, _, _ = client.pipeline()              \
-            .rpush(jkey, self.encode([1, tid, state, result]))          \
-            .llen(jkey)                                                 \
-            .get(tkey)                                                  \
-            .expire(jkey, 86400)                                        \
-            .expire(tkey, 86400)                                        \
-            .execute()
+        with client.pipeline() as pipe:
+            _, readycount, totaldiff, _, _ = pipe                           \
+                .rpush(jkey, self.encode([1, tid, state, result]))          \
+                .llen(jkey)                                                 \
+                .get(tkey)                                                  \
+                .expire(jkey, 86400)                                        \
+                .expire(tkey, 86400)                                        \
+                .execute()
 
         totaldiff = int(totaldiff or 0)
 
@@ -222,11 +227,12 @@ class RedisBackend(KeyValueStoreBackend):
             total = callback['chord_size'] + totaldiff
             if readycount == total:
                 decode, unpack = self.decode, self._unpack_chord_result
-                resl, _, _ = client.pipeline()  \
-                    .lrange(jkey, 0, total)     \
-                    .delete(jkey)               \
-                    .delete(tkey)               \
-                    .execute()
+                with client.pipeline() as pipe:
+                    resl, _, _ = pipe               \
+                        .lrange(jkey, 0, total)     \
+                        .delete(jkey)               \
+                        .delete(tkey)               \
+                        .execute()
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
                 except Exception as exc:

+ 6 - 0
celery/tests/backends/test_redis.py

@@ -37,6 +37,12 @@ class Pipeline(object):
             return self
         return add_step
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        pass
+
     def execute(self):
         return [step(*a, **kw) for step, a, kw in self.steps]