couchbase.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.couchbase
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~
  5. CouchBase result store backend.
  6. """
  7. from __future__ import absolute_import
  8. import logging
  9. try:
  10. from couchbase import Couchbase
  11. from couchbase.connection import Connection
  12. from couchbase.exceptions import NotFoundError
  13. except ImportError:
  14. Couchbase = Connection = NotFoundError = None # noqa
  15. from kombu.utils.url import _parse_url
  16. from celery.exceptions import ImproperlyConfigured
  17. from .base import KeyValueStoreBackend
  18. __all__ = ['CouchBaseBackend']
  19. class CouchBaseBackend(KeyValueStoreBackend):
  20. bucket = 'default'
  21. host = 'localhost'
  22. port = 8091
  23. username = None
  24. password = None
  25. quiet = False
  26. conncache = None
  27. unlock_gil = True
  28. timeout = 2.5
  29. transcoder = None
  30. # supports_autoexpire = False
  31. def __init__(self, url=None, *args, **kwargs):
  32. """Initialize CouchBase backend instance.
  33. :raises celery.exceptions.ImproperlyConfigured: if
  34. module :mod:`couchbase` is not available.
  35. """
  36. super(CouchBaseBackend, self).__init__(*args, **kwargs)
  37. if Couchbase is None:
  38. raise ImproperlyConfigured(
  39. 'You need to install the couchbase library to use the '
  40. 'CouchBase backend.',
  41. )
  42. uhost = uport = uname = upass = ubucket = None
  43. if url:
  44. _, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
  45. ubucket = ubucket.strip('/') if ubucket else None
  46. config = self.app.conf.get('CELERY_COUCHBASE_BACKEND_SETTINGS', None)
  47. if config is not None:
  48. if not isinstance(config, dict):
  49. raise ImproperlyConfigured(
  50. 'Couchbase backend settings should be grouped in a dict',
  51. )
  52. else:
  53. config = {}
  54. self.host = uhost or config.get('host', self.host)
  55. self.port = int(uport or config.get('port', self.port))
  56. self.bucket = ubucket or config.get('bucket', self.bucket)
  57. self.username = uname or config.get('username', self.username)
  58. self.password = upass or config.get('password', self.password)
  59. self._connection = None
  60. def _get_connection(self):
  61. """Connect to the Couchbase server."""
  62. if self._connection is None:
  63. kwargs = {'bucket': self.bucket, 'host': self.host}
  64. if self.port:
  65. kwargs.update({'port': self.port})
  66. if self.username:
  67. kwargs.update({'username': self.username})
  68. if self.password:
  69. kwargs.update({'password': self.password})
  70. logging.debug('couchbase settings %r', kwargs)
  71. self._connection = Connection(**kwargs)
  72. return self._connection
  73. @property
  74. def connection(self):
  75. return self._get_connection()
  76. def get(self, key):
  77. try:
  78. return self.connection.get(key).value
  79. except NotFoundError:
  80. return None
  81. def set(self, key, value):
  82. self.connection.set(key, value)
  83. def mget(self, keys):
  84. return [self.get(key) for key in keys]
  85. def delete(self, key):
  86. self.connection.delete(key)