Browse Source

Added backend for Couchbase

Alman One 11 years ago
parent
commit
24f6205c46
3 changed files with 323 additions and 0 deletions
  1. 1 0
      celery/backends/__init__.py
  2. 146 0
      celery/backends/couchbase.py
  3. 176 0
      celery/tests/backends/test_couchbase.py

+ 1 - 0
celery/backends/__init__.py

@@ -30,6 +30,7 @@ BACKEND_ALIASES = {
     'mongodb': 'celery.backends.mongodb:MongoBackend',
     'database': 'celery.backends.database:DatabaseBackend',
     'cassandra': 'celery.backends.cassandra:CassandraBackend',
+    'couchbase': 'celery.backends.couchbase:CouchBaseBackend',
     'disabled': 'celery.backends.base:DisabledBackend',
 }
 

+ 146 - 0
celery/backends/couchbase.py

@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+"""
+    celery.backends.couchbase
+    ~~~~~~~~~~~~~~~~~~~~~~~
+
+    CouchBase result store backend.
+
+"""
+from __future__ import absolute_import
+
+from datetime import datetime
+
+try:
+    import couchbase
+    from couchbase import Couchbase 
+    from couchbase.connection import Connection
+    from couchbase.user_constants import *
+    import couchbase._libcouchbase as _LCB
+except ImportError:
+    couchbase = None   # noqa
+
+from kombu.utils import cached_property
+from kombu.utils.url import _parse_url
+
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.five import string_t
+from celery.utils.timeutils import maybe_timedelta
+
+from .base import KeyValueStoreBackend
+import logging
+
+# class Bunch(object):
+
+#     def __init__(self, **kw):
+#         self.__dict__.update(kw)
+
+def is_single_url(obj):
+    try:
+        return isinstance(obj, basestring)
+    except NameError:
+        return isinstance(obj, str)
+
+class CouchBaseBackend(KeyValueStoreBackend):
+
+    bucket = "default"
+    host = 'localhost'
+    port = 8091
+    username = None
+    password = None
+    quiet=False
+    conncache=None
+    unlock_gil=True
+    timeout=2.5
+    transcoder=None
+    # supports_autoexpire = False
+    
+    def __init__(self, url=None, *args, **kwargs):
+        """Initialize CouchBase backend instance.
+
+        :raises celery.exceptions.ImproperlyConfigured: if
+            module :mod:`pymongo` is not available.
+
+        """
+        super(CouchBaseBackend, self).__init__(*args, **kwargs)
+
+        self.expires = kwargs.get('expires') or maybe_timedelta(
+            self.app.conf.CELERY_TASK_RESULT_EXPIRES)
+
+        if not couchbase:
+            raise ImproperlyConfigured(
+                'You need to install the couchbase library to use the '
+                'CouchBase backend.')
+
+        uhost = uport = uname = upass = ubucket = None
+        if url:
+            _, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
+            ubucket = ubucket.strip('/') if ubucket else 'default_bucket'
+
+        config = self.app.conf.get('CELERY_COUCHBASE_BACKEND_SETTINGS', None)
+        if config is not None:
+            if not isinstance(config, dict):
+                raise ImproperlyConfigured(
+                    'Couchbase backend settings should be grouped in a dict')
+        else:
+            config = {}
+            
+        self.host = uhost or config.get('host', self.host)
+        self.port = int(uport or config.get('port', self.port))
+        self.bucket = ubucket or config.get('bucket', self.bucket)
+        self.username = uname or config.get('username', self.username)
+        self.password = upass or config.get('password', self.password)
+        # self.quiet = config.get('quiet', self.quiet)
+        # self.conncache = config.get('conncache', self.conncache)
+        # self.unlock_gil = config.get('unlock_gil', self.unlock_gil)
+        # self.timeout = config.get('timeout', self.timeout)
+        # self.transcoder = config.get('transcoder', self.transcoder)
+        # self.lockmode = config.get('lockmode', self.lockmode)
+            
+        self._connection = None
+
+    def _get_connection(self):
+        """Connect to the MongoDB server."""
+        if self._connection is None:
+            kwargs = {
+                'bucket': self.bucket,
+                'host': self.host
+            }
+
+            if self.port:
+                kwargs.update({'port': self.port})
+            if self.username:
+                kwargs.update({'username': self.username})
+            if self.password:
+                kwargs.update({'password': self.password})
+
+            logging.debug("couchbase settings %s" % kwargs)
+            self._connection = Connection(
+                **dict(kwargs)
+            )
+        return self._connection
+
+    @property
+    def connection(self):
+        return self._get_connection()
+
+    def get(self, key):
+        try:
+            if self._connection == None:
+                self._connection = self._get_connection()
+            return self._connection.get(key).value
+        except:
+            return None
+
+    def set(self, key, value):
+        if self._connection == None:
+            self._connection = self._get_connection()
+        self._connection.set(key, value)
+
+    def mget(self, keys):
+        return [self.get(key) for key in keys]
+
+    def delete(self, key):
+        if self._connection == None:
+            self._connection = self._get_connection()
+        self._connection.delete(key)

+ 176 - 0
celery/tests/backends/test_couchbase.py

@@ -0,0 +1,176 @@
+from __future__ import absolute_import
+
+from datetime import timedelta
+
+from mock import MagicMock, Mock, patch, sentinel
+from nose import SkipTest
+
+from celery import Celery
+from celery import current_app
+from celery import states
+from celery.backends import couchbase as module
+from celery.backends.couchbase import CouchBaseBackend, couchbase
+from celery.datastructures import AttributeDict
+from celery.exceptions import ImproperlyConfigured
+from celery.result import AsyncResult
+from celery.task import subtask
+from celery.utils.timeutils import timedelta_seconds
+from celery.backends.base import KeyValueStoreBackend
+from celery import backends
+from celery.tests.case import AppCase
+from celery.app import app_or_default
+from pprint import pprint
+
+COUCHBASE_BUCKET = 'celery_bucket'
+
+class test_CouchBaseBackend(AppCase):
+
+    def setUp(self):
+        if couchbase is None:
+            raise SkipTest('couchbase is not installed.')
+        from celery.app import current_app
+        app = self.app = self._current_app = current_app()
+        self.backend = CouchBaseBackend(app=self.app)
+        
+    def test_init_no_couchbase(self):
+        """
+        test init no couchbase raises
+        """
+        prev, module.couchbase = module.couchbase, None
+        try:
+            with self.assertRaises(ImproperlyConfigured):
+                CouchBaseBackend(app=self.app)
+        finally:
+            module.couchbase = prev
+
+    def test_init_no_settings(self):
+        """
+        test init no settings
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = []
+        with self.assertRaises(ImproperlyConfigured):
+            CouchBaseBackend(app=celery)
+
+    def test_init_settings_is_None(self):
+        """
+        Test init settings is None
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
+        CouchBaseBackend(app=celery)
+
+    def test_get_connection_connection_exists(self):
+        """
+        Test get existing connection
+        """
+        with patch('couchbase.connection.Connection') as mock_Connection:
+            self.backend._connection = sentinel._connection
+
+            connection = self.backend._get_connection()
+
+            self.assertEquals(sentinel._connection, connection)
+            self.assertFalse(mock_Connection.called)
+
+    def test_get(self):
+        """
+        Test get
+        CouchBaseBackend.get
+        should return  and take two params
+        db conn to couchbase is mocked
+        TODO Should test on key not exists
+        """
+        celery = Celery(set_as_current=False)
+        # celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
+        
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
+
+        x = CouchBaseBackend(app=celery)
+        x._connection = Mock()
+        mocked_get = x._connection.get = Mock()
+        mocked_get.return_value.value = sentinel.retval
+        # should return None
+        self.assertEqual(x.get('1f3fab'), sentinel.retval)
+        x._connection.get.assert_called_once_with('1f3fab')
+
+    # betta
+    def test_set(self):
+        """
+        Test set
+        CouchBaseBackend.set
+        should return None and take two params
+        db conn to couchbase is mocked
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
+        x = CouchBaseBackend(app=celery)
+        x._connection = MagicMock()
+        x._connection.set = MagicMock()
+        # should return None
+        self.assertIsNone(x.set(sentinel.key, sentinel.value))
+    #     x._connection.set.assert_called_once_with(sentinel.key, sentinel.value)
+
+
+    def test_delete(self):
+        """
+        Test get
+        CouchBaseBackend.get
+        should return  and take two params
+        db conn to couchbase is mocked
+        TODO Should test on key not exists
+        """
+        celery = Celery(set_as_current=False)
+        # celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
+        
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
+
+        x = CouchBaseBackend(app=celery)
+        x._connection = Mock()
+        mocked_delete = x._connection.delete = Mock()
+        mocked_delete.return_value = None
+        # should return None
+        self.assertIsNone(x.delete('1f3fab'))
+        x._connection.delete.assert_called_once_with('1f3fab')
+
+    def test_config_params(self):
+        """
+        test celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS
+        is properly set
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {'bucket':'mycoolbucket', 
+                                                         'host':['here.host.com',
+                                                                 'there.host.com'],
+                                                         'username':'johndoe',
+                                                         'password':'mysecret',
+                                                         'port': '1234'}
+        x = CouchBaseBackend(app=celery)
+        self.assertEqual(x.bucket, "mycoolbucket")
+        self.assertEqual(x.host, ['here.host.com', 'there.host.com'],)
+        self.assertEqual(x.username, "johndoe",)
+        self.assertEqual(x.password, 'mysecret')
+        self.assertEqual(x.port, 1234)
+
+    def test_backend_by_url(self, url='couchbase://myhost/mycoolbucket'):
+        """
+        test get backend by url
+        """
+        from celery.backends.couchbase import CouchBaseBackend
+        backend, url_ = backends.get_backend_by_url(url)
+        self.assertIs(backend, CouchBaseBackend)
+        self.assertEqual(url_, url)
+
+    def test_backend_params_by_url(self):
+        """
+        test get backend params by url
+        """
+        celery = Celery(set_as_current=False,
+                        backend='couchbase://johndoe:mysecret@myhost:123/mycoolbucket')
+        x = celery.backend
+        self.assertEqual(x.bucket, "mycoolbucket")
+        self.assertEqual(x.host, "myhost")
+        self.assertEqual(x.username, "johndoe")
+        self.assertEqual(x.password, "mysecret")
+        self.assertEqual(x.port, 123)
+