couchbase.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.couchbase
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. CouchBase result store backend.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import datetime
  9. try:
  10. import couchbase
  11. from couchbase import Couchbase
  12. from couchbase.connection import Connection
  13. from couchbase.exceptions import NotFoundError
  14. except ImportError:
  15. couchbase = None # noqa
  16. from kombu.utils.url import _parse_url
  17. from celery import states
  18. from celery.exceptions import ImproperlyConfigured
  19. from celery.utils.timeutils import maybe_timedelta
  20. from .base import KeyValueStoreBackend
  21. import logging
  22. class CouchBaseBackend(KeyValueStoreBackend):
  23. bucket = "default"
  24. host = 'localhost'
  25. port = 8091
  26. username = None
  27. password = None
  28. quiet=False
  29. conncache=None
  30. unlock_gil=True
  31. timeout=2.5
  32. transcoder=None
  33. # supports_autoexpire = False
  34. def __init__(self, url=None, *args, **kwargs):
  35. """Initialize CouchBase backend instance.
  36. :raise celery.exceptions.ImproperlyConfigured: if
  37. module :mod:`couchbase` is not available.
  38. """
  39. super(CouchBaseBackend, self).__init__(*args, **kwargs)
  40. self.expires = kwargs.get('expires') or maybe_timedelta(
  41. self.app.conf.CELERY_TASK_RESULT_EXPIRES)
  42. if not couchbase:
  43. raise ImproperlyConfigured(
  44. 'You need to install the couchbase library to use the '
  45. 'CouchBase backend.')
  46. uhost = uport = uname = upass = ubucket = None
  47. if url:
  48. _, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
  49. ubucket = ubucket.strip('/') if ubucket else None
  50. config = self.app.conf.get('CELERY_COUCHBASE_BACKEND_SETTINGS', None)
  51. if config is not None:
  52. if not isinstance(config, dict):
  53. raise ImproperlyConfigured(
  54. 'Couchbase backend settings should be grouped in a dict')
  55. else:
  56. config = {}
  57. self.host = uhost or config.get('host', self.host)
  58. self.port = int(uport or config.get('port', self.port))
  59. self.bucket = ubucket or config.get('bucket', self.bucket)
  60. self.username = uname or config.get('username', self.username)
  61. self.password = upass or config.get('password', self.password)
  62. self._connection = None
  63. def _get_connection(self):
  64. """Connect to the Couchbase server."""
  65. if self._connection is None:
  66. kwargs = {
  67. 'bucket': self.bucket,
  68. 'host': self.host
  69. }
  70. if self.port:
  71. kwargs.update({'port': self.port})
  72. if self.username:
  73. kwargs.update({'username': self.username})
  74. if self.password:
  75. kwargs.update({'password': self.password})
  76. logging.debug("couchbase settings %s" % kwargs)
  77. self._connection = Connection(
  78. **dict(kwargs)
  79. )
  80. return self._connection
  81. @property
  82. def connection(self):
  83. return self._get_connection()
  84. def get(self, key):
  85. try:
  86. return self.connection.get(key).value
  87. except NotFoundError:
  88. return None
  89. def set(self, key, value):
  90. self.connection.set(key, value)
  91. def mget(self, keys):
  92. return [self.get(key) for key in keys]
  93. def delete(self, key):
  94. self.connection.delete(key)