couchdb.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. """CouchDB result store backend."""
  3. from __future__ import absolute_import, unicode_literals
  4. from kombu.utils.url import _parse_url
  5. from celery.exceptions import ImproperlyConfigured
  6. from .base import KeyValueStoreBackend
  7. try:
  8. import pycouchdb
  9. except ImportError:
  10. pycouchdb = None # noqa
  11. __all__ = ['CouchBackend']
  12. ERR_LIB_MISSING = """\
  13. You need to install the pycouchdb library to use the CouchDB result backend\
  14. """
  15. class CouchBackend(KeyValueStoreBackend):
  16. """CouchDB backend.
  17. Raises:
  18. celery.exceptions.ImproperlyConfigured:
  19. if module :pypi:`pycouchdb` is not available.
  20. """
  21. container = 'default'
  22. scheme = 'http'
  23. host = 'localhost'
  24. port = 5984
  25. username = None
  26. password = None
  27. def __init__(self, url=None, *args, **kwargs):
  28. super(CouchBackend, self).__init__(*args, **kwargs)
  29. self.url = url
  30. if pycouchdb is None:
  31. raise ImproperlyConfigured(ERR_LIB_MISSING)
  32. uscheme = uhost = uport = uname = upass = ucontainer = None
  33. if url:
  34. _, uhost, uport, uname, upass, ucontainer, _ = _parse_url(url) # noqa
  35. ucontainer = ucontainer.strip('/') if ucontainer else None
  36. self.scheme = uscheme or self.scheme
  37. self.host = uhost or self.host
  38. self.port = int(uport or self.port)
  39. self.container = ucontainer or self.container
  40. self.username = uname or self.username
  41. self.password = upass or self.password
  42. self._connection = None
  43. def _get_connection(self):
  44. """Connect to the CouchDB server."""
  45. if self.username and self.password:
  46. conn_string = '%s://%s:%s@%s:%s' % (
  47. self.scheme, self.username, self.password,
  48. self.host, str(self.port))
  49. server = pycouchdb.Server(conn_string, authmethod='basic')
  50. else:
  51. conn_string = '%s://%s:%s' % (
  52. self.scheme, self.host, str(self.port))
  53. server = pycouchdb.Server(conn_string)
  54. try:
  55. return server.database(self.container)
  56. except pycouchdb.exceptions.NotFound:
  57. return server.create(self.container)
  58. @property
  59. def connection(self):
  60. if self._connection is None:
  61. self._connection = self._get_connection()
  62. return self._connection
  63. def get(self, key):
  64. try:
  65. return self.connection.get(key)['value']
  66. except pycouchdb.exceptions.NotFound:
  67. return None
  68. def set(self, key, value):
  69. data = {'_id': key, 'value': value}
  70. try:
  71. self.connection.save(data)
  72. except pycouchdb.exceptions.Conflict:
  73. # document already exists, update it
  74. data = self.connection.get(key)
  75. data['value'] = value
  76. self.connection.save(data)
  77. def mget(self, keys):
  78. return [self.get(key) for key in keys]
  79. def delete(self, key):
  80. self.connection.delete(key)