riak.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.riak
  4. ~~~~~~~~~~~~~~~~~~~~
  5. Riak result store backend.
  6. """
  7. from __future__ import absolute_import
  8. import sys
  9. try:
  10. import riak
  11. from riak import RiakClient
  12. from riak.resolver import last_written_resolver
  13. except ImportError: # pragma: no cover
  14. riak = RiakClient = last_written_resolver = None # noqa
  15. from kombu.utils.url import _parse_url
  16. from celery.exceptions import ImproperlyConfigured
  17. from .base import KeyValueStoreBackend
  18. E_BUCKET_NAME = """\
  19. Riak bucket names must be composed of ASCII characters only, not: {0!r}\
  20. """
  21. if sys.version_info[0] == 3:
  22. def to_bytes(s):
  23. return s.encode() if isinstance(s, str) else s
  24. def str_decode(s, encoding):
  25. return to_bytes(s).decode(encoding)
  26. else:
  27. def str_decode(s, encoding):
  28. return s.decode("ascii")
  29. def is_ascii(s):
  30. try:
  31. str_decode(s, 'ascii')
  32. except UnicodeDecodeError:
  33. return False
  34. return True
  35. class RiakBackend(KeyValueStoreBackend):
  36. # TODO: allow using other protocols than protobuf ?
  37. #: default protocol used to connect to Riak, might be `http` or `pbc`
  38. protocol = 'pbc'
  39. #: default Riak bucket name (`default`)
  40. bucket_name = 'celery'
  41. #: default Riak server hostname (`localhost`)
  42. host = 'localhost'
  43. #: default Riak server port (8087)
  44. port = 8087
  45. # supports_autoexpire = False
  46. def __init__(self, host=None, port=None, bucket_name=None, protocol=None,
  47. url=None, *args, **kwargs):
  48. """Initialize Riak backend instance.
  49. :raises celery.exceptions.ImproperlyConfigured: if
  50. module :mod:`riak` is not available.
  51. """
  52. super(RiakBackend, self).__init__(*args, **kwargs)
  53. if not riak:
  54. raise ImproperlyConfigured(
  55. 'You need to install the riak library to use the '
  56. 'Riak backend.')
  57. uhost = uport = uname = upass = ubucket = None
  58. if url:
  59. uprot, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
  60. if ubucket:
  61. ubucket = ubucket.strip('/')
  62. config = self.app.conf.get('riak_backend_settings', None)
  63. if config is not None:
  64. if not isinstance(config, dict):
  65. raise ImproperlyConfigured(
  66. 'Riak backend settings should be grouped in a dict')
  67. else:
  68. config = {}
  69. self.host = uhost or config.get('host', self.host)
  70. self.port = int(uport or config.get('port', self.port))
  71. self.bucket_name = ubucket or config.get('bucket', self.bucket_name)
  72. self.protocol = protocol or config.get('protocol', self.protocol)
  73. # riak bucket must be ascii letters or numbers only
  74. if not is_ascii(self.bucket_name):
  75. raise ValueError(E_BUCKET_NAME.format(self.bucket_name))
  76. self._client = None
  77. def _get_client(self):
  78. """Get client connection."""
  79. if self._client is None or not self._client.is_alive():
  80. self._client = RiakClient(protocol=self.protocol,
  81. host=self.host,
  82. pb_port=self.port)
  83. self._client.resolver = last_written_resolver
  84. return self._client
  85. def _get_bucket(self):
  86. """Connect to our bucket."""
  87. if (
  88. self._client is None or not self._client.is_alive() or
  89. not self._bucket
  90. ):
  91. self._bucket = self.client.bucket(self.bucket_name)
  92. return self._bucket
  93. @property
  94. def client(self):
  95. return self._get_client()
  96. @property
  97. def bucket(self):
  98. return self._get_bucket()
  99. def get(self, key):
  100. return self.bucket.get(key).data
  101. def set(self, key, value):
  102. _key = self.bucket.new(key, data=value)
  103. _key.store()
  104. def mget(self, keys):
  105. return [self.get(key).data for key in keys]
  106. def delete(self, key):
  107. self.bucket.delete(key)