couchbase.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: utf-8 -*-
  2. """Couchbase result store backend."""
  3. import logging
  4. from kombu.utils.encoding import str_t
  5. from kombu.utils.url import _parse_url
  6. from celery.exceptions import ImproperlyConfigured
  7. from .base import KeyValueStoreBackend
  8. try:
  9. from couchbase import Couchbase
  10. from couchbase.connection import Connection
  11. from couchbase.exceptions import NotFoundError
  12. except ImportError:
  13. Couchbase = Connection = NotFoundError = None # noqa
  14. __all__ = ['CouchbaseBackend']
  15. class CouchbaseBackend(KeyValueStoreBackend):
  16. """Couchbase backend.
  17. Raises:
  18. celery.exceptions.ImproperlyConfigured:
  19. if module :pypi:`couchbase` is not available.
  20. """
  21. bucket = 'default'
  22. host = 'localhost'
  23. port = 8091
  24. username = None
  25. password = None
  26. quiet = False
  27. timeout = 2.5
  28. # Use str as couchbase key not bytes
  29. key_t = str_t
  30. def __init__(self, url=None, *args, **kwargs):
  31. super().__init__(*args, **kwargs)
  32. self.url = url
  33. if Couchbase is None:
  34. raise ImproperlyConfigured(
  35. 'You need to install the couchbase library to use the '
  36. 'Couchbase backend.',
  37. )
  38. uhost = uport = uname = upass = ubucket = None
  39. if url:
  40. _, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
  41. ubucket = ubucket.strip('/') if ubucket else None
  42. config = self.app.conf.get('couchbase_backend_settings', None)
  43. if config is not None:
  44. if not isinstance(config, dict):
  45. raise ImproperlyConfigured(
  46. 'Couchbase backend settings should be grouped in a dict',
  47. )
  48. else:
  49. config = {}
  50. self.host = uhost or config.get('host', self.host)
  51. self.port = int(uport or config.get('port', self.port))
  52. self.bucket = ubucket or config.get('bucket', self.bucket)
  53. self.username = uname or config.get('username', self.username)
  54. self.password = upass or config.get('password', self.password)
  55. self._connection = None
  56. def _get_connection(self):
  57. """Connect to the Couchbase server."""
  58. if self._connection is None:
  59. kwargs = {'bucket': self.bucket, 'host': self.host}
  60. if self.port:
  61. kwargs.update({'port': self.port})
  62. if self.username:
  63. kwargs.update({'username': self.username})
  64. if self.password:
  65. kwargs.update({'password': self.password})
  66. logging.debug('couchbase settings %r', kwargs)
  67. self._connection = Connection(**kwargs)
  68. return self._connection
  69. @property
  70. def connection(self):
  71. return self._get_connection()
  72. def get(self, key):
  73. try:
  74. return self.connection.get(key).value
  75. except NotFoundError:
  76. return None
  77. def set(self, key, value):
  78. self.connection.set(key, value)
  79. def mget(self, keys):
  80. return [self.get(key) for key in keys]
  81. def delete(self, key):
  82. self.connection.delete(key)