riak.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # -*- coding: utf-8 -*-
  2. """Riak result store backend."""
  3. from __future__ import absolute_import, unicode_literals
  4. import sys
  5. from kombu.utils.url import _parse_url
  6. from celery.exceptions import ImproperlyConfigured
  7. from .base import KeyValueStoreBackend
  8. try:
  9. import riak
  10. from riak import RiakClient
  11. from riak.resolver import last_written_resolver
  12. except ImportError: # pragma: no cover
  13. riak = RiakClient = last_written_resolver = None # noqa
  14. __all__ = ('RiakBackend',)
  15. E_BUCKET_NAME = """\
  16. Riak bucket names must be composed of ASCII characters only, not: {0!r}\
  17. """
  18. if sys.version_info[0] == 3:
  19. def to_bytes(s):
  20. return s.encode() if isinstance(s, str) else s
  21. def str_decode(s, encoding):
  22. return to_bytes(s).decode(encoding)
  23. else:
  24. def str_decode(s, encoding):
  25. return s.decode('ascii')
  26. def is_ascii(s):
  27. try:
  28. str_decode(s, 'ascii')
  29. except UnicodeDecodeError:
  30. return False
  31. return True
  32. class RiakBackend(KeyValueStoreBackend):
  33. """Riak result backend.
  34. Raises:
  35. celery.exceptions.ImproperlyConfigured:
  36. if module :pypi:`riak` is not available.
  37. """
  38. # TODO: allow using other protocols than protobuf ?
  39. #: default protocol used to connect to Riak, might be `http` or `pbc`
  40. protocol = 'pbc'
  41. #: default Riak bucket name (`default`)
  42. bucket_name = 'celery'
  43. #: default Riak server hostname (`localhost`)
  44. host = 'localhost'
  45. #: default Riak server port (8087)
  46. port = 8087
  47. _bucket = None
  48. def __init__(self, host=None, port=None, bucket_name=None, protocol=None,
  49. url=None, *args, **kwargs):
  50. super(RiakBackend, self).__init__(*args, **kwargs)
  51. self.url = url
  52. if not riak:
  53. raise ImproperlyConfigured(
  54. 'You need to install the riak library to use the '
  55. 'Riak backend.')
  56. uhost = uport = upass = ubucket = None
  57. if url:
  58. _, uhost, uport, _, upass, ubucket, _ = _parse_url(url)
  59. if ubucket:
  60. ubucket = ubucket.strip('/')
  61. config = self.app.conf.get('riak_backend_settings', None)
  62. if config is not None:
  63. if not isinstance(config, dict):
  64. raise ImproperlyConfigured(
  65. 'Riak backend settings should be grouped in a dict')
  66. else:
  67. config = {}
  68. self.host = uhost or config.get('host', self.host)
  69. self.port = int(uport or config.get('port', self.port))
  70. self.bucket_name = ubucket or config.get('bucket', self.bucket_name)
  71. self.protocol = protocol or config.get('protocol', self.protocol)
  72. # riak bucket must be ascii letters or numbers only
  73. if not is_ascii(self.bucket_name):
  74. raise ValueError(E_BUCKET_NAME.format(self.bucket_name))
  75. self._client = None
  76. def _get_client(self):
  77. """Get client connection."""
  78. if self._client is None or not self._client.is_alive():
  79. self._client = RiakClient(protocol=self.protocol,
  80. host=self.host,
  81. pb_port=self.port)
  82. self._client.resolver = last_written_resolver
  83. return self._client
  84. def _get_bucket(self):
  85. """Connect to our bucket."""
  86. if (
  87. self._client is None or not self._client.is_alive() or
  88. not self._bucket
  89. ):
  90. self._bucket = self.client.bucket(self.bucket_name)
  91. return self._bucket
  92. @property
  93. def client(self):
  94. return self._get_client()
  95. @property
  96. def bucket(self):
  97. return self._get_bucket()
  98. def get(self, key):
  99. return self.bucket.get(key).data
  100. def set(self, key, value):
  101. _key = self.bucket.new(key, data=value)
  102. _key.store()
  103. def mget(self, keys):
  104. return [self.get(key).data for key in keys]
  105. def delete(self, key):
  106. self.bucket.delete(key)