redis.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from __future__ import absolute_import
  2. from kombu.utils import cached_property
  3. from celery.backends.base import KeyValueStoreBackend
  4. from celery.exceptions import ImproperlyConfigured
  5. try:
  6. import redis
  7. from redis.exceptions import ConnectionError
  8. except ImportError:
  9. redis = None # noqa
  10. ConnectionError = None # noqa
  11. class RedisBackend(KeyValueStoreBackend):
  12. """Redis task result store."""
  13. #: redis-py client module.
  14. redis = redis
  15. #: default Redis server hostname (`localhost`).
  16. host = "localhost"
  17. #: default Redis server port (6379)
  18. port = 6379
  19. #: default Redis db number (0)
  20. db = 0
  21. #: default Redis password (:const:`None`)
  22. password = None
  23. def __init__(self, host=None, port=None, db=None, password=None,
  24. expires=None, **kwargs):
  25. super(RedisBackend, self).__init__(**kwargs)
  26. conf = self.app.conf
  27. if self.redis is None:
  28. raise ImproperlyConfigured(
  29. "You need to install the redis library in order to use "
  30. + "Redis result store backend.")
  31. # For compatability with the old REDIS_* configuration keys.
  32. def _get(key):
  33. for prefix in "REDIS_%s", "CELERY_REDIS_%s":
  34. try:
  35. return conf[prefix % key]
  36. except KeyError:
  37. pass
  38. self.host = host or _get("HOST") or self.host
  39. self.port = int(port or _get("PORT") or self.port)
  40. self.db = db or _get("DB") or self.db
  41. self.password = password or _get("PASSWORD") or self.password
  42. self.expires = self.prepare_expires(expires, type=int)
  43. def get(self, key):
  44. return self.client.get(key)
  45. def mget(self, keys):
  46. return self.client.mget(keys)
  47. def set(self, key, value):
  48. client = self.client
  49. client.set(key, value)
  50. if self.expires is not None:
  51. client.expire(key, self.expires)
  52. def delete(self, key):
  53. self.client.delete(key)
  54. def process_cleanup(self):
  55. pass
  56. def on_chord_apply(self, *args, **kwargs):
  57. pass
  58. def on_chord_part_return(self, task, propagate=False,
  59. keyprefix="chord-unlock-%s"):
  60. from celery.task.sets import subtask
  61. from celery.result import TaskSetResult
  62. setid = task.request.taskset
  63. key = keyprefix % setid
  64. deps = TaskSetResult.restore(setid, backend=task.backend)
  65. if self.client.incr(key) >= deps.total:
  66. subtask(task.request.chord).delay(deps.join(propagate=propagate))
  67. deps.delete()
  68. self.client.expire(key, 86400)
  69. @cached_property
  70. def client(self):
  71. return self.redis.Redis(host=self.host, port=self.port,
  72. db=self.db, password=self.password)
  73. def __reduce__(self, args=(), kwargs={}):
  74. kwargs.update(
  75. dict(host=self.host,
  76. port=self.port,
  77. db=self.db,
  78. password=self.password,
  79. expires=self.expires))
  80. return super(RedisBackend, self).__reduce__(args, kwargs)