|
@@ -1,9 +1,12 @@
|
|
|
import warnings
|
|
|
+from datetime import timedelta
|
|
|
|
|
|
|
|
|
+from celery import conf
|
|
|
from celery.loaders import load_settings
|
|
|
from celery.backends.base import KeyValueStoreBackend
|
|
|
from celery.exceptions import ImproperlyConfigured
|
|
|
+from celery.utils import timeutils
|
|
|
|
|
|
try:
|
|
|
import redis
|
|
@@ -34,6 +37,7 @@ class RedisBackend(KeyValueStoreBackend):
|
|
|
redis_password = None
|
|
|
redis_timeout = None
|
|
|
redis_connect_retry = None
|
|
|
+ expires = None
|
|
|
|
|
|
deprecated_settings = frozenset(["REDIS_TIMEOUT",
|
|
|
"REDIS_CONNECT_RETRY"])
|
|
@@ -42,7 +46,8 @@ class RedisBackend(KeyValueStoreBackend):
|
|
|
redis_timeout=None,
|
|
|
redis_password=None,
|
|
|
redis_connect_retry=None,
|
|
|
- redis_connect_timeout=None):
|
|
|
+ redis_connect_timeout=None,
|
|
|
+ expires=None):
|
|
|
if redis is None:
|
|
|
raise ImproperlyConfigured(
|
|
|
"You need to install the redis library in order to use "
|
|
@@ -58,6 +63,13 @@ class RedisBackend(KeyValueStoreBackend):
|
|
|
self.redis_password = redis_password or \
|
|
|
getattr(settings, "REDIS_PASSWORD",
|
|
|
self.redis_password)
|
|
|
+ self.expires = expires
|
|
|
+ if self.expires is None:
|
|
|
+ self.expires = conf.AMQP_TASK_RESULT_EXPIRES
|
|
|
+ if isinstance(self.expires, timedelta):
|
|
|
+ self.expires = timeutils.timedelta_seconds(self.expires)
|
|
|
+ if self.expires is not None:
|
|
|
+ self.expires = int(self.expires)
|
|
|
|
|
|
for setting_name in self.deprecated_settings:
|
|
|
if getattr(settings, setting_name, None) is not None:
|
|
@@ -104,7 +116,10 @@ class RedisBackend(KeyValueStoreBackend):
|
|
|
return self.open().get(key)
|
|
|
|
|
|
def set(self, key, value):
|
|
|
- self.open().set(key, value)
|
|
|
+ r = self.open()
|
|
|
+ r.set(key, value)
|
|
|
+ if self.expires is not None:
|
|
|
+ r.expire(key, self.expires)
|
|
|
|
|
|
def delete(self, key):
|
|
|
self.open().delete(key)
|