Browse Source

Add Elasticsearch Backend

Ahmet Demir 9 years ago
parent
commit
4a806a63c5

+ 1 - 0
celery/backends/__init__.py

@@ -31,6 +31,7 @@ BACKEND_ALIASES = {
     'mongodb': 'celery.backends.mongodb:MongoBackend',
     'db': 'celery.backends.database:DatabaseBackend',
     'database': 'celery.backends.database:DatabaseBackend',
+    'elasticsearch': 'celery.backends.elasticsearch:ElasticsearchBackend',
     'cassandra': 'celery.backends.cassandra:CassandraBackend',
     'couchbase': 'celery.backends.couchbase:CouchBaseBackend',
     'couchdb': 'celery.backends.couchdb:CouchDBBackend',

+ 120 - 0
celery/backends/elasticsearch.py

@@ -0,0 +1,120 @@
+# -* coding: utf-8 -*-
+"""
+    celery.backends.elasticsearch
+    ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Elasticsearch result store backend.
+    Based on CouchDB backend.
+
+"""
+from __future__ import absolute_import
+
+try:
+    import elasticsearch
+except ImportError:
+    elasticsearch = None  # noqa
+
+from .base import KeyValueStoreBackend
+
+import datetime
+
+from kombu.utils.url import _parse_url
+
+from celery.exceptions import ImproperlyConfigured
+
+__all__ = ['ElasticsearchBackend']
+
+ERR_LIB_MISSING = """\
+You need to install the elasticsearch library to use the Elasticsearch \
+result backend\
+"""
+
+class ElasticsearchBackend(KeyValueStoreBackend):
+
+    index = 'celery'
+    doc_type = 'backend'
+    scheme = 'http'
+    host = 'localhost'
+    port = 9200
+
+
+    def __init__(self, url=None, *args, **kwargs):
+        """Initialize Elasticsearch backend instance.
+
+        :raises celery.exceptions.ImproperlyConfigured: if
+            module :mod:`elasticsearch` is not available.
+
+        """
+        super(ElasticsearchBackend, self).__init__(*args, **kwargs)
+
+        if elasticsearch is None:
+            raise ImproperlyConfigured(ERR_LIB_MISSING)
+
+        uindex = udoc_type = uscheme = uhost = uport = None
+        
+        if url:
+            uscheme, uhost, uport, _, _, uuri, _ = _parse_url(url)  # noqa
+            uuri = uuri.strip('/') if uuri else None
+            uuris = uuri.split("/")
+            uindex = uuris[0] if len(uuris) > 0 else None
+            udoc_type = uuris[1] if len(uuris) > 1 else None
+
+        self.index = uindex or self.index
+        self.doc_type = udoc_type or self.doc_type
+        self.scheme = uscheme or self.scheme
+        self.host = uhost or self.host
+        self.port = uport or self.port
+
+        self._server = None
+
+
+    def _get_server(self):
+        """Connect to the Elasticsearch server."""
+        return elasticsearch.Elasticsearch(self.host)
+
+
+    @property
+    def server(self):
+        if self._server is None:
+            self._server = self._get_server()
+        return self._server
+
+
+    def get(self, key):
+        try:
+            out = self.server.get(index=self.index,\
+                                  doc_type=self.doc_type,\
+                                  id=key)
+            if isinstance(out, dict) \
+                    and "found" in out and out["found"] \
+                    and "_source" in out and key in out["_source"]:
+                return out["_source"][key]
+            else:
+                return None
+        except elasticsearch.exceptions.NotFoundError:
+            return None
+
+
+    def set(self, key, value):
+        try:
+            data = {}
+            data['@timestamp'] = "{0}Z".format(datetime.datetime.utcnow()\
+                                       .strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
+            data[key] = value
+            self.server.index(index=self.index, doc_type=self.doc_type,\
+                              id=key, body=data)
+        except elasticsearch.exceptions.ConflictError:
+            # document already exists, update it
+            data = self.get(key)
+            data[key] = value
+            self.server.index(index=self.index, doc_type=self.doc_type,\
+                              id=key, body=data, refresh=True)
+
+
+    def mget(self, keys):
+        return [self.get(key) for key in keys]
+
+
+    def delete(self, key):
+        self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
+

+ 86 - 0
celery/tests/backends/test_elasticsearch.py

@@ -0,0 +1,86 @@
+from __future__ import absolute_import
+
+from celery.backends import elasticsearch as module
+from celery.backends.elasticsearch import ElasticsearchBackend
+from celery.exceptions import ImproperlyConfigured
+from celery import backends
+from celery.tests.case import (
+    AppCase, Mock, SkipTest, sentinel,
+)
+
+try:
+    import elasticsearch
+except ImportError:
+    elasticsearch = None
+
+
+class test_ElasticsearchBackend(AppCase):
+
+
+    def setup(self):
+        if elasticsearch is None:
+            raise SkipTest('elasticsearch is not installed.')
+        self.backend = ElasticsearchBackend(app=self.app)
+
+
+    def test_init_no_elasticsearch(self):
+        prev, module.elasticsearch = module.elasticsearch, None
+        try:
+            with self.assertRaises(ImproperlyConfigured):
+                ElasticsearchBackend(app=self.app)
+        finally:
+            module.elasticsearch = prev
+
+
+    def test_get(self):
+        x = ElasticsearchBackend(app=self.app)
+        x._server = Mock()
+        x._server.get = Mock()
+        # expected result
+        r = dict(found=True, _source={sentinel.task_id: sentinel.result})
+        x._server.get.return_value = r
+        dict_result = x.get(sentinel.task_id)
+
+        self.assertEqual(dict_result, sentinel.result)
+        x._server.get.assert_called_once_with(doc_type=x.doc_type, id=sentinel.task_id, index=x.index)
+
+
+    def test_get_none(self):
+        x = ElasticsearchBackend(app=self.app)
+        x._server = Mock()
+        x._server.get = Mock()
+        x._server.get.return_value = sentinel.result
+        none_reusult = x.get(sentinel.task_id)
+
+        self.assertEqual(none_reusult, None)
+        x._server.get.assert_called_once_with(doc_type=x.doc_type, id=sentinel.task_id, index=x.index)
+
+
+    def test_delete(self):
+        x = ElasticsearchBackend(app=self.app)
+        x._server = Mock()
+        x._server.delete = Mock()
+        x._server.delete.return_value = sentinel.result
+
+        self.assertIsNone(x.delete(sentinel.task_id), sentinel.result)
+        x._server.delete.assert_called_once_with(doc_type=x.doc_type, id=sentinel.task_id, index=x.index)
+
+
+    def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):
+        backend, url_ = backends.get_backend_by_url(url, self.app.loader)
+
+        self.assertIs(backend, ElasticsearchBackend)
+        self.assertEqual(url_, url)
+
+
+    def test_backend_params_by_url(self):
+        url = 'elasticsearch://localhost:9200/index/doc_type'
+        with self.Celery(backend=url) as app:
+            x = app.backend
+
+            self.assertEqual(x.index, 'index')
+            self.assertEqual(x.doc_type, 'doc_type')
+            self.assertEqual(x.scheme, 'elasticsearch')
+            self.assertEqual(x.host, 'localhost')
+            self.assertEqual(x.port, 9200)
+

+ 1 - 0
requirements/extras/elasticsearch.txt

@@ -0,0 +1 @@
+elasticsearch