Kaynağa Gözat

Merge branch 'EvaSDK/add-riak-backend'

Conflicts:
	setup.py
Ask Solem 10 yıl önce
ebeveyn
işleme
648d8ee748

+ 1 - 0
celery/backends/__init__.py

@@ -33,6 +33,7 @@ BACKEND_ALIASES = {
     'database': 'celery.backends.database:DatabaseBackend',
     'cassandra': 'celery.backends.cassandra:CassandraBackend',
     'couchbase': 'celery.backends.couchbase:CouchBaseBackend',
+    'riak': 'celery.backends.riak:RiakBackend',
     'disabled': 'celery.backends.base:DisabledBackend',
 }
 

+ 141 - 0
celery/backends/riak.py

@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+"""
+    celery.backends.riak
+    ~~~~~~~~~~~~~~~~~~~~~~~
+
+    Riak result store backend.
+
+"""
+from __future__ import absolute_import, print_function
+
+from datetime import datetime
+
+try:
+    import riak
+    from riak import RiakClient, RiakNode
+    from riak.resolver import last_written_resolver
+except ImportError:  # pragma: no cover
+    riak = None      # noqa
+
+from kombu.utils.url import _parse_url
+
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.utils.timeutils import maybe_timedelta
+
+from .base import KeyValueStoreBackend
+
+
+class NonAsciiBucket(Exception):
+    """ Bucket must ne ascii charchters only. """
+
+
+class Validators(object):
+
+    @classmethod
+    def validate_riak_bucket_name(cls, bucket_name):
+        try:
+            bucket_name.decode('ascii')
+        except UnicodeDecodeError as ude:
+            return False
+        return True
+
+
+class RiakBackend(KeyValueStoreBackend):
+    # TODO: allow using other protocols than protobuf ?
+    #: default protocol used to connect to Riak, might be `http` or `pbc`
+    protocol = 'pbc'
+
+    #: default Riak bucket name (`default`)
+    bucket_name = "celery"
+
+    #: default Riak server hostname (`localhost`)
+    host = 'localhost'
+
+    #: default Riak server port (8087)
+    port = 8087
+
+    # supports_autoexpire = False
+
+    def __init__(self, host=None, port=None, bucket_name=None, protocol=None,
+                 url=None, *args, **kwargs):
+        """Initialize Riak backend instance.
+
+        :raises celery.exceptions.ImproperlyConfigured: if
+            module :mod:`riak` is not available.
+        """
+        super(RiakBackend, self).__init__(*args, **kwargs)
+
+        self.expires = kwargs.get('expires') or maybe_timedelta(
+            self.app.conf.CELERY_TASK_RESULT_EXPIRES)
+
+        if not riak:
+            raise ImproperlyConfigured(
+                'You need to install the riak library to use the '
+                'Riak backend.')
+
+        uhost = uport = uname = upass = ubucket = None
+        if url:
+            uprot, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
+            if ubucket:
+                ubucket = ubucket.strip('/')
+
+        config = self.app.conf.get('CELERY_RIAK_BACKEND_SETTINGS', None)
+        if config is not None:
+            if not isinstance(config, dict):
+                raise ImproperlyConfigured(
+                    'Riak backend settings should be grouped in a dict')
+        else:
+            config = {}
+
+        self.host = uhost or config.get('host', self.host)
+        self.port = int(uport or config.get('port', self.port))
+        self.bucket_name = ubucket or config.get('bucket', self.bucket_name)
+        self.protocol = protocol or config.get('protocol', self.protocol)
+
+        # riak bucket must be ascii letters or numbers only
+        if not Validators.validate_riak_bucket_name(self.bucket_name):
+            raise NonAsciiBucket("Riak bucket names must be ASCII characters")
+
+        self._client = None
+
+    def _get_client(self):
+        """Get client connection"""
+        if self._client is None or not self._client.is_alive():
+            self._client = RiakClient(protocol=self.protocol,
+                                      host=self.host,
+                                      pb_port=self.port)
+            self._client.resolver = last_written_resolver
+        return self._client
+
+    def _get_bucket(self):
+        """Connect to our bucket"""
+        if (
+            self._client is None or not self._client.is_alive()
+            or not self._bucket
+        ):
+            self._bucket = self.client.bucket(self.bucket_name)
+        return self._bucket
+
+    @property
+    def client(self):
+        return self._get_client()
+
+    @property
+    def bucket(self):
+        return self._get_bucket()
+
+    def get(self, key):
+        return self.bucket.get(key).data
+
+    def set(self, key, value):
+        # RiakBucket.new(key=None, data=None, content_type='application/json',
+        # encoded_data=None)
+        _key = self.bucket.new(key, data=value)
+        _key.store()
+
+    def mget(self, keys):
+        return [self.get(key).data for key in keys]
+
+    def delete(self, key):
+        self.bucket.delete(key)

+ 174 - 0
celery/tests/backends/test_riak.py

@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import, with_statement
+
+from mock import MagicMock, Mock, patch, sentinel
+from nose import SkipTest
+
+from celery import Celery
+from celery.backends import riak as module
+from celery.backends.riak import RiakBackend, riak, NonAsciiBucket
+from celery.exceptions import ImproperlyConfigured
+from celery.tests.utils import AppCase
+
+
+RIAK_BUCKET = 'riak_bucket'
+
+
+class test_RiakBackend(AppCase):
+
+    def setUp(self):
+        if riak is None:
+            raise SkipTest('riak is not installed.')
+        from celery.app import current_app
+        self.app = self._current_app = current_app()
+        self.backend = RiakBackend(app=self.app)
+
+    def test_init_no_riak(self):
+        """
+        test init no riak raises
+        """
+        prev, module.riak = module.riak, None
+        try:
+            with self.assertRaises(ImproperlyConfigured):
+                RiakBackend(app=self.app)
+        finally:
+            module.riak = prev
+
+    def test_init_no_settings(self):
+        """
+        test init no settings
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_RIAK_BACKEND_SETTINGS = []
+        with self.assertRaises(ImproperlyConfigured):
+            RiakBackend(app=celery)
+
+    def test_init_settings_is_None(self):
+        """
+        Test init settings is None
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_RIAK_BACKEND_SETTINGS = None
+        RiakBackend(app=celery)
+
+    def test_get_client_client_exists(self):
+        """
+        Test get existing client
+        """
+        with patch('riak.client.RiakClient') as mock_connection:
+            self.backend._client = sentinel._client
+
+            mocked_is_alive = self.backend._client.is_alive = Mock()
+            mocked_is_alive.return_value.value = True
+            client = self.backend._get_client()
+            self.assertEquals(sentinel._client, client)
+            self.assertFalse(mock_connection.called)
+
+    def test_get(self):
+        """
+        Test get
+        RiakBackend.get
+        should return  and take two params
+        db conn to riak is mocked
+        TODO Should test on key not exists
+        """
+        celery = Celery(set_as_current=False)
+
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
+
+        backend = RiakBackend(app=celery)
+        backend._client = Mock()
+        backend._bucket = Mock()
+        mocked_get = backend._bucket.get = Mock()
+        mocked_get.return_value.data = sentinel.retval
+        # should return None
+        self.assertEqual(backend.get('1f3fab'), sentinel.retval)
+        backend._bucket.get.assert_called_once_with('1f3fab')
+
+    def test_set(self):
+        """
+        Test set
+        RiakBackend.set
+        should return None and take two params
+        db conn to couchbase is mocked
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = None
+        backend = RiakBackend(app=celery)
+        backend._client = MagicMock()
+        backend._bucket = MagicMock()
+        backend._bucket.set = MagicMock()
+        # should return None
+        self.assertIsNone(backend.set(sentinel.key, sentinel.value))
+
+    def test_delete(self):
+        """
+        Test get
+        RiakBackend.get
+        should return  and take two params
+        db conn to couchbase is mocked
+        TODO Should test on key not exists
+        """
+        celery = Celery(set_as_current=False)
+
+        celery.conf.CELERY_COUCHBASE_BACKEND_SETTINGS = {}
+
+        backend = RiakBackend(app=celery)
+        backend._client = Mock()
+        backend._bucket = Mock()
+        mocked_delete = backend._client.delete = Mock()
+        mocked_delete.return_value = None
+        # should return None
+        self.assertIsNone(backend.delete('1f3fab'))
+        backend._bucket.delete.assert_called_once_with('1f3fab')
+
+    def test_config_params(self):
+        """
+        test celery.conf.CELERY_RIAK_BACKEND_SETTINGS
+        celery.conf.CELERY_RIAK_BACKEND_SETTINGS
+        is properly set
+        """
+        celery = Celery(set_as_current=False)
+        celery.conf.CELERY_RIAK_BACKEND_SETTINGS = {'bucket': 'mycoolbucket',
+                                                    'host': 'there.host.com',
+                                                    'port': '1234'}
+        backend = RiakBackend(app=celery)
+        self.assertEqual(backend.bucket_name, "mycoolbucket")
+        self.assertEqual(backend.host, 'there.host.com')
+        self.assertEqual(backend.port, 1234)
+
+    def test_backend_by_url(self, url='riak://myhost/mycoolbucket'):
+        """
+        test get backend by url
+        """
+        from celery.backends.riak import RiakBackend
+        backend, url_ = backends.get_backend_by_url(url)
+        self.assertIs(backend, RiakBackend)
+        self.assertEqual(url_, url)
+
+    def test_backend_params_by_url(self):
+        """
+        test get backend params by url
+        """
+        celery = Celery(set_as_current=False,
+                        backend='riak://myhost:123/mycoolbucket')
+        backend = celery.backend
+        self.assertEqual(backend.bucket_name, "mycoolbucket")
+        self.assertEqual(backend.host, "myhost")
+        self.assertEqual(backend.port, 123)
+
+    def test_non_ASCII_bucket_raises(self):
+        """
+        test celery.conf.CELERY_RIAK_BACKEND_SETTINGS
+        celery.conf.CELERY_RIAK_BACKEND_SETTINGS
+        is properly set
+        """
+        with self.assertRaises(NonAsciiBucket):
+            celery = Celery(set_as_current=False)
+            celery.conf.CELERY_RIAK_BACKEND_SETTINGS = {
+                'bucket': 'héhé',
+                'host': 'there.host.com',
+                'port': '1234',
+            }
+            RiakBackend(app=celery)

+ 65 - 0
docs/configuration.rst

@@ -628,6 +628,71 @@ Example configuration
         'max_retries': 10
     }
 
+.. _conf-riak-result-backend:
+
+Riak backend settings
+---------------------
+
+.. note::
+
+    The Riak backend requires the :mod:`riak` library:
+    http://pypi.python.org/pypi/riak/
+
+    To install the riak package use `pip` or `easy_install`:
+
+    .. code-block:: bash
+
+        $ pip install riak
+
+This backend requires the :setting:`CELERY_RESULT_BACKEND`
+setting to be set to a Riak URL::
+
+    CELERY_RESULT_BACKEND = "riak://host:port/bucket"
+
+For example::
+
+    CELERY_RESULT_BACKEND = "riak://localhost/celery
+
+which is the same as::
+
+    CELERY_RESULT_BACKEND = "riak://"
+
+The fields of the URL is defined as folows:
+
+- *host*
+
+Host name or IP address of the Riak server. e.g. `"localhost"`.
+
+- *port*
+
+Port to the Riak server using the protobuf protocol. Default is 8087.
+
+- *bucket*
+
+Bucket name to use. Default is `celery`.
+The bucket needs to be a string with ascii characters only.
+
+Altenatively, this backend can be configured with the following configuration directives.
+
+.. setting:: CELERY_RIAK_BACKEND_SETTINGS
+
+CELERY_RIAK_BACKEND_SETTINGS
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This is a dict supporting the following keys:
+
+* host
+    The host name of the Riak server. Defaults to "localhost".
+
+* port
+    The port the Riak server is listening to. Defaults to 8087.
+
+* bucket
+    The bucket name to connect to. Defaults to "celery".
+
+* protocol
+    The protocol to use to connect to the Riak server. This is not configurable
+    via :setting:`CELERY_RESULT_BACKEND`
 
 .. _conf-ironcache-result-backend:
 

+ 3 - 0
docs/includes/installation.txt

@@ -86,6 +86,9 @@ Transports and Backends
 :celery[couchbase]:
     for using CouchBase as a result backend.
 
+:celery[riak]:
+    for using Riak as a result backend.
+
 :celery[beanstalk]:
     for using Beanstalk as a message transport (*experimental*).
 

+ 1 - 0
requirements/extras/riak.txt

@@ -0,0 +1 @@
+riak >=2.0

+ 1 - 0
requirements/test-ci.txt

@@ -1,6 +1,7 @@
 coverage>=3.0
 coveralls
 redis
+#riak >=2.0
 #pymongo
 #SQLAlchemy
 PyOpenSSL

+ 7 - 5
setup.py

@@ -155,11 +155,13 @@ if CELERY_COMPAT_PROGRAMS:
 
 extras = lambda *p: reqs('extras', *p)
 # Celery specific
-specific_list = ['auth', 'cassandra', 'memcache', 'couchbase', 'threads',
-                 'eventlet', 'gevent', 'msgpack', 'yaml', 'redis',
-                 'mongodb', 'sqs', 'couchdb', 'beanstalk', 'zookeeper',
-                 'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq']
-extras_require = dict((x, extras(x + '.txt')) for x in specific_list)
+features = {
+    'auth', 'cassandra', 'memcache', 'couchbase', 'threads',
+    'eventlet', 'gevent', 'msgpack', 'yaml', 'redis',
+    'mongodb', 'sqs', 'couchdb', 'riak', 'beanstalk', 'zookeeper',
+    'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq',
+}
+extras_require = {x: extras(x + '.txt') for x in features}
 extra['extras_require'] = extras_require
 
 # -*- %%% -*-