浏览代码

Chord counter to use result_expires and is touched (#3573)

Chord counter keeps track of chord tasks that have finished. This needs
to be compatible with result_expires so that if a chord task result is
still in cache, chord will be able finish and join (unlock). Otherwise
we see that the chord counter has expired while the chord task result
still in cache. Incrementing an expired chord counter returns None and
comparing that to dependent task number throws an error.

Counter's timeout also needs to be refreshed on every chord part return.
Memcached backend didn't implement `expire` using client's `touch`, so
this is implemented here as well.
Tayfun Sen 8 年之前
父节点
当前提交
4c4f5d8dd0
共有 3 个文件被更改,包括 10 次插入4 次删除
  1. 1 1
      celery/backends/base.py
  2. 7 1
      celery/backends/cache.py
  3. 2 2
      celery/backends/redis.py

+ 1 - 1
celery/backends/base.py

@@ -750,7 +750,7 @@ class BaseKeyValueStoreBackend(Backend):
                 deps.delete()
                 self.client.delete(key)
         else:
-            self.expire(key, 86400)
+            self.expire(key, self.expires)
 
 
 class KeyValueStoreBackend(BaseKeyValueStoreBackend, SyncBackendMixin):

+ 7 - 1
celery/backends/cache.py

@@ -76,6 +76,9 @@ class DummyClient(object):
     def incr(self, key, delta=1):
         return self.cache.incr(key, delta)
 
+    def touch(self, key, expire):
+        pass
+
 
 backends = {
     'memcache': get_best_memcache,
@@ -126,13 +129,16 @@ class CacheBackend(KeyValueStoreBackend):
         return self.client.delete(key)
 
     def _apply_chord_incr(self, header, partial_args, group_id, body, **opts):
-        self.client.set(self.get_key_for_chord(group_id), 0, time=86400)
+        self.client.set(self.get_key_for_chord(group_id), 0, time=self.expires)
         return super(CacheBackend, self)._apply_chord_incr(
             header, partial_args, group_id, body, **opts)
 
     def incr(self, key):
         return self.client.incr(key)
 
+    def expire(self, key, value):
+        return self.client.touch(key, value)
+
     @cached_property
     def client(self):
         return self.Client(self.servers, **self.options)

+ 2 - 2
celery/backends/redis.py

@@ -254,8 +254,8 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
                 .rpush(jkey, self.encode([1, tid, state, result]))          \
                 .llen(jkey)                                                 \
                 .get(tkey)                                                  \
-                .expire(jkey, 86400)                                        \
-                .expire(tkey, 86400)                                        \
+                .expire(jkey, self.expires)                                 \
+                .expire(tkey, self.expires)                                 \
                 .execute()
 
         totaldiff = int(totaldiff or 0)