| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 | import warningsfrom datetime import timedeltafrom celery.backends.base import KeyValueStoreBackendfrom celery.exceptions import ImproperlyConfiguredfrom celery.utils import timeutilstry:    import redis    from redis.exceptions import ConnectionErrorexcept ImportError:    redis = None    ConnectionError = Noneclass RedisBackend(KeyValueStoreBackend):    """Redis based task backend store.    .. attribute:: redis_host        The hostname to the Redis server.    .. attribute:: redis_port        The port to the Redis server.        Raises :class:`celery.exceptions.ImproperlyConfigured` if        the :setting:`REDIS_HOST` or :setting:`REDIS_PORT` settings is not set.    """    redis_host = "localhost"    redis_port = 6379    redis_db = 0    redis_password = None    redis_timeout = None    redis_connect_retry = None    expires = None    deprecated_settings = frozenset(["REDIS_TIMEOUT",                                     "REDIS_CONNECT_RETRY"])    def __init__(self, redis_host=None, redis_port=None, redis_db=None,            redis_timeout=None,            redis_password=None,            redis_connect_retry=None,            redis_connect_timeout=None,            expires=None, **kwargs):        super(RedisBackend, self).__init__(**kwargs)        if redis is None:            raise ImproperlyConfigured(                    "You need to install the redis library in order to use "                  + "Redis result store backend.")        self.redis_host = (redis_host or                           self.app.conf.get("REDIS_HOST") or                           self.redis_host)        self.redis_port = (redis_port or                           self.app.conf.get("REDIS_PORT") or                           self.redis_port)        self.redis_db = (redis_db or                         self.app.conf.get("REDIS_DB") or                         self.redis_db)        self.redis_password = (redis_password or                               self.app.conf.get("REDIS_PASSWORD") or                               self.redis_password)        self.expires = expires        if self.expires is None:            self.expires = self.app.conf.CELERY_TASK_RESULT_EXPIRES        if isinstance(self.expires, timedelta):            self.expires = timeutils.timedelta_seconds(self.expires)        if self.expires is not None:            self.expires = int(self.expires)        for setting_name in self.deprecated_settings:            if self.app.conf.get(setting_name) is not None:                warnings.warn(                    "The setting '%s' is no longer supported by the "                    "python Redis client!" % setting_name.upper(),                    DeprecationWarning)        if self.redis_port:            self.redis_port = int(self.redis_port)        if not self.redis_host or not self.redis_port:            raise ImproperlyConfigured(                "In order to use the Redis result store backend, you have to "                "set the REDIS_HOST and REDIS_PORT settings")        self._connection = None    def open(self):        """Get :class:`redis.Redis`` instance with the current        server configuration.        The connection is then cached until you do an        explicit :meth:`close`.        """        # connection overrides bool()        if self._connection is None:            self._connection = redis.Redis(host=self.redis_host,                                    port=self.redis_port,                                    db=self.redis_db,                                    password=self.redis_password)        return self._connection    def close(self):        """Close the connection to redis."""        if self._connection is not None:            self._connection.connection.disconnect()            self._connection = None    def process_cleanup(self):        self.close()    def get(self, key):        return self.open().get(key)    def set(self, key, value):        r = self.open()        r.set(key, value)        if self.expires is not None:            r.expire(key, self.expires)    def delete(self, key):        self.open().delete(key)
 |