redis.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from ..exceptions import ImproperlyConfigured
  4. from ..utils import cached_property
  5. from .base import KeyValueStoreBackend
  6. try:
  7. import redis
  8. from redis.exceptions import ConnectionError
  9. except ImportError:
  10. redis = None # noqa
  11. ConnectionError = None # noqa
  12. class RedisBackend(KeyValueStoreBackend):
  13. """Redis task result store."""
  14. #: redis-py client module.
  15. redis = redis
  16. #: default Redis server hostname (`localhost`).
  17. host = "localhost"
  18. #: default Redis server port (6379)
  19. port = 6379
  20. #: default Redis db number (0)
  21. db = 0
  22. #: default Redis password (:const:`None`)
  23. password = None
  24. #: Maximium number of connections in the pool.
  25. max_connections = None
  26. supports_native_join = True
  27. def __init__(self, host=None, port=None, db=None, password=None,
  28. expires=None, max_connections=None, **kwargs):
  29. super(RedisBackend, self).__init__(**kwargs)
  30. conf = self.app.conf
  31. if self.redis is None:
  32. raise ImproperlyConfigured(
  33. "You need to install the redis library in order to use "
  34. + "Redis result store backend.")
  35. # For compatibility with the old REDIS_* configuration keys.
  36. def _get(key):
  37. for prefix in "CELERY_REDIS_%s", "REDIS_%s":
  38. try:
  39. return conf[prefix % key]
  40. except KeyError:
  41. pass
  42. self.host = host or _get("HOST") or self.host
  43. self.port = int(port or _get("PORT") or self.port)
  44. self.db = db or _get("DB") or self.db
  45. self.password = password or _get("PASSWORD") or self.password
  46. self.expires = self.prepare_expires(expires, type=int)
  47. self.max_connections = (max_connections
  48. or _get("MAX_CONNECTIONS")
  49. or self.max_connections)
  50. def get(self, key):
  51. return self.client.get(key)
  52. def mget(self, keys):
  53. return self.client.mget(keys)
  54. def set(self, key, value):
  55. client = self.client
  56. client.set(key, value)
  57. if self.expires is not None:
  58. client.expire(key, self.expires)
  59. client.publish(key, value)
  60. def delete(self, key):
  61. self.client.delete(key)
  62. def on_chord_apply(self, setid, body, result=None, **kwargs):
  63. self.app.TaskSetResult(setid, result).save()
  64. def on_chord_part_return(self, task, propagate=False):
  65. from ..task.sets import subtask
  66. from ..result import TaskSetResult
  67. setid = task.request.taskset
  68. if not setid:
  69. return
  70. key = self.get_key_for_chord(setid)
  71. deps = TaskSetResult.restore(setid, backend=task.backend)
  72. if self.client.incr(key) >= deps.total:
  73. subtask(task.request.chord).delay(deps.join(propagate=propagate))
  74. deps.delete()
  75. self.client.delete(key)
  76. else:
  77. self.client.expire(key, 86400)
  78. @cached_property
  79. def client(self):
  80. pool = self.redis.ConnectionPool(host=self.host, port=self.port,
  81. db=self.db, password=self.password,
  82. max_connections=self.max_connections)
  83. return self.redis.Redis(connection_pool=pool)
  84. def __reduce__(self, args=(), kwargs={}):
  85. kwargs.update(
  86. dict(host=self.host,
  87. port=self.port,
  88. db=self.db,
  89. password=self.password,
  90. expires=self.expires,
  91. max_connections=self.max_connections))
  92. return super(RedisBackend, self).__reduce__(args, kwargs)