pyredis.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import warnings
  2. from datetime import timedelta
  3. from celery.backends.base import KeyValueStoreBackend
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery.utils import timeutils
  6. try:
  7. import redis
  8. from redis.exceptions import ConnectionError
  9. except ImportError:
  10. redis = None
  11. ConnectionError = None
  12. class RedisBackend(KeyValueStoreBackend):
  13. """Redis based task backend store.
  14. .. attribute:: redis_host
  15. The hostname to the Redis server.
  16. .. attribute:: redis_port
  17. The port to the Redis server.
  18. Raises :class:`celery.exceptions.ImproperlyConfigured` if
  19. the :setting:`REDIS_HOST` or :setting:`REDIS_PORT` settings is not set.
  20. """
  21. redis_host = "localhost"
  22. redis_port = 6379
  23. redis_db = 0
  24. redis_password = None
  25. redis_timeout = None
  26. redis_connect_retry = None
  27. expires = None
  28. deprecated_settings = frozenset(["REDIS_TIMEOUT",
  29. "REDIS_CONNECT_RETRY"])
  30. def __init__(self, redis_host=None, redis_port=None, redis_db=None,
  31. redis_timeout=None,
  32. redis_password=None,
  33. redis_connect_retry=None,
  34. redis_connect_timeout=None,
  35. expires=None, **kwargs):
  36. super(RedisBackend, self).__init__(**kwargs)
  37. if redis is None:
  38. raise ImproperlyConfigured(
  39. "You need to install the redis library in order to use "
  40. + "Redis result store backend.")
  41. self.redis_host = (redis_host or
  42. self.app.conf.get("REDIS_HOST") or
  43. self.redis_host)
  44. self.redis_port = (redis_port or
  45. self.app.conf.get("REDIS_PORT") or
  46. self.redis_port)
  47. self.redis_db = (redis_db or
  48. self.app.conf.get("REDIS_DB") or
  49. self.redis_db)
  50. self.redis_password = (redis_password or
  51. self.app.conf.get("REDIS_PASSWORD") or
  52. self.redis_password)
  53. self.expires = expires
  54. if self.expires is None:
  55. self.expires = self.app.conf.CELERY_TASK_RESULT_EXPIRES
  56. if isinstance(self.expires, timedelta):
  57. self.expires = timeutils.timedelta_seconds(self.expires)
  58. if self.expires is not None:
  59. self.expires = int(self.expires)
  60. for setting_name in self.deprecated_settings:
  61. if self.app.conf.get(setting_name) is not None:
  62. warnings.warn(
  63. "The setting '%s' is no longer supported by the "
  64. "python Redis client!" % setting_name.upper(),
  65. DeprecationWarning)
  66. if self.redis_port:
  67. self.redis_port = int(self.redis_port)
  68. if not self.redis_host or not self.redis_port:
  69. raise ImproperlyConfigured(
  70. "In order to use the Redis result store backend, you have to "
  71. "set the REDIS_HOST and REDIS_PORT settings")
  72. self._connection = None
  73. def open(self):
  74. """Get :class:`redis.Redis`` instance with the current
  75. server configuration.
  76. The connection is then cached until you do an
  77. explicit :meth:`close`.
  78. """
  79. # connection overrides bool()
  80. if self._connection is None:
  81. self._connection = redis.Redis(host=self.redis_host,
  82. port=self.redis_port,
  83. db=self.redis_db,
  84. password=self.redis_password)
  85. return self._connection
  86. def close(self):
  87. """Close the connection to redis."""
  88. if self._connection is not None:
  89. self._connection.connection.disconnect()
  90. self._connection = None
  91. def process_cleanup(self):
  92. self.close()
  93. def get(self, key):
  94. return self.open().get(key)
  95. def set(self, key, value):
  96. r = self.open()
  97. r.set(key, value)
  98. if self.expires is not None:
  99. r.expire(key, self.expires)
  100. def delete(self, key):
  101. self.open().delete(key)