Kaynağa Gözat

Use all redis pipelines as context managers to ensure that they are always cleaned up properly, especially in the case of exceptions

Justin Patrin 10 yıl önce
ebeveyn
işleme
efd13231e6
1 değiştirilmiş dosya ile 18 ekleme ve 16 silme
  1. 18 16
      celery/backends/redis.py

+ 18 - 16
celery/backends/redis.py

@@ -164,13 +164,13 @@ class RedisBackend(KeyValueStoreBackend):
         return self.ensure(self._set, (key, value), **retry_policy)
 
     def _set(self, key, value):
-        pipe = self.client.pipeline()
-        if self.expires:
-            pipe.setex(key, value, self.expires)
-        else:
-            pipe.set(key, value)
-        pipe.publish(key, value)
-        pipe.execute()
+        with self.client.pipeline() as pipe:
+            if self.expires:
+                pipe.setex(key, value, self.expires)
+            else:
+                pipe.set(key, value)
+            pipe.publish(key, value)
+            pipe.execute()
 
     def delete(self, key):
         self.client.delete(key)
@@ -209,21 +209,23 @@ class RedisBackend(KeyValueStoreBackend):
         client = self.client
         jkey = self.get_key_for_group(gid, '.j')
         result = self.encode_result(result, state)
-        _, readycount, _ = client.pipeline()                            \
-            .rpush(jkey, self.encode([1, tid, state, result]))          \
-            .llen(jkey)                                                 \
-            .expire(jkey, 86400)                                        \
-            .execute()
+        with client.pipeline() as pipe:
+            _, readycount, _ = pipe                                         \
+                .rpush(jkey, self.encode([1, tid, state, result]))          \
+                .llen(jkey)                                                 \
+                .expire(jkey, 86400)                                        \
+                .execute()
 
         try:
             callback = maybe_signature(request.chord, app=app)
             total = callback['chord_size']
             if readycount == total:
                 decode, unpack = self.decode, self._unpack_chord_result
-                resl, _ = client.pipeline()     \
-                    .lrange(jkey, 0, total)     \
-                    .delete(jkey)               \
-                    .execute()
+                with client.pipeline() as pipe:
+                    resl, _, = pipe                 \
+                        .lrange(jkey, 0, total)     \
+                        .delete(jkey)               \
+                        .execute()
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
                 except Exception as exc: