pyredis.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from datetime import timedelta
  2. from kombu.utils import cached_property
  3. from celery.backends.base import KeyValueStoreBackend
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery.utils import timeutils
  6. try:
  7. import redis
  8. from redis.exceptions import ConnectionError
  9. except ImportError:
  10. redis = None # noqa
  11. ConnectionError = None # noqa
  12. class RedisBackend(KeyValueStoreBackend):
  13. """Redis task result store."""
  14. #: redis-py client module.
  15. redis = redis
  16. #: default Redis server hostname (`localhost`).
  17. redis_host = "localhost"
  18. #: default Redis server port (6379)
  19. redis_port = 6379
  20. redis_db = 0
  21. #: default Redis password (:const:`None`)
  22. redis_password = None
  23. def __init__(self, redis_host=None, redis_port=None, redis_db=None,
  24. redis_password=None,
  25. expires=None, **kwargs):
  26. super(RedisBackend, self).__init__(**kwargs)
  27. if self.redis is None:
  28. raise ImproperlyConfigured(
  29. "You need to install the redis library in order to use "
  30. + "Redis result store backend.")
  31. self.redis_host = (redis_host or
  32. self.app.conf.get("REDIS_HOST") or
  33. self.redis_host)
  34. self.redis_port = (redis_port or
  35. self.app.conf.get("REDIS_PORT") or
  36. self.redis_port)
  37. self.redis_db = (redis_db or
  38. self.app.conf.get("REDIS_DB") or
  39. self.redis_db)
  40. self.redis_password = (redis_password or
  41. self.app.conf.get("REDIS_PASSWORD") or
  42. self.redis_password)
  43. self.expires = expires
  44. if self.expires is None:
  45. self.expires = self.app.conf.CELERY_TASK_RESULT_EXPIRES
  46. if isinstance(self.expires, timedelta):
  47. self.expires = timeutils.timedelta_seconds(self.expires)
  48. if self.expires is not None:
  49. self.expires = int(self.expires)
  50. self.redis_port = int(self.redis_port)
  51. def get(self, key):
  52. return self.client.get(key)
  53. def set(self, key, value):
  54. client = self.client
  55. client.set(key, value)
  56. if self.expires is not None:
  57. client.expire(key, self.expires)
  58. def delete(self, key):
  59. self.client.delete(key)
  60. def close(self):
  61. """Closes the Redis connection."""
  62. del(self.client)
  63. def process_cleanup(self):
  64. self.close()
  65. def on_chord_apply(self, *args, **kwargs):
  66. pass
  67. def on_chord_part_return(self, task, keyprefix="chord-unlock-%s"):
  68. from celery.task.sets import subtask
  69. from celery.result import TaskSetResult
  70. setid = task.request.taskset
  71. key = keyprefix % setid
  72. deps = TaskSetResult.restore(setid, backend=task.backend)
  73. if self.client.incr(key) >= deps.total:
  74. subtask(task.request.chord).delay(deps.join())
  75. deps.delete()
  76. self.client.expire(key, 86400)
  77. @cached_property
  78. def client(self):
  79. return self.redis.Redis(host=self.redis_host,
  80. port=self.redis_port,
  81. db=self.redis_db,
  82. password=self.redis_password)
  83. @client.deleter # noqa
  84. def client(self, client):
  85. client.connection.disconnect()