Browse Source

Cosmetics for Elasticsearch result backend (Issue #2828)

Ask Solem 9 years ago
parent
commit
9364a9ec89

+ 2 - 2
README.rst

@@ -34,7 +34,7 @@ any language.  So far there's RCelery_ for the Ruby programming language, and a
 `PHP client`, but language interoperability can also be achieved
 by using webhooks.
 
-.. _RCelery: http://leapfrogonline.github.io/rcelery/
+.. _RCelery: http://leapfrogdevelopment.github.com/rcelery/
 .. _`PHP client`: https://github.com/gjedeer/celery-php
 .. _`using webhooks`:
     http://docs.celeryproject.org/en/latest/userguide/remote-tasks.html
@@ -139,7 +139,7 @@ It supports...
         - AMQP, Redis
         - memcached, MongoDB
         - SQLAlchemy, Django ORM
-        - Apache Cassandra, IronCache
+        - Apache Cassandra, IronCache, Elasticsearch
 
     - **Serialization**
 

+ 68 - 67
celery/backends/elasticsearch.py

@@ -1,35 +1,41 @@
 # -* coding: utf-8 -*-
 """
     celery.backends.elasticsearch
-    ~~~~~~~~~~~~~~~~~~~~~~~~~
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     Elasticsearch result store backend.
-    Based on CouchDB backend.
 
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, unicode_literals
 
-try:
-    import elasticsearch
-except ImportError:
-    elasticsearch = None  # noqa
-
-from .base import KeyValueStoreBackend
-
-import datetime
+from datetime import datetime
 
 from kombu.utils.url import _parse_url
 
 from celery.exceptions import ImproperlyConfigured
 
+from .base import KeyValueStoreBackend
+
+try:
+    import elasticsearch
+except ImportError:
+    elasticsearch = None  # noqa
+
 __all__ = ['ElasticsearchBackend']
 
-ERR_LIB_MISSING = """\
+E_LIB_MISSING = """\
 You need to install the elasticsearch library to use the Elasticsearch \
-result backend\
+result backend.\
 """
 
+
 class ElasticsearchBackend(KeyValueStoreBackend):
+    """Elasticsearch Backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`elasticsearch` is not available.
+
+    """
 
     index = 'celery'
     doc_type = 'backend'
@@ -37,84 +43,79 @@ class ElasticsearchBackend(KeyValueStoreBackend):
     host = 'localhost'
     port = 9200
 
-
     def __init__(self, url=None, *args, **kwargs):
-        """Initialize Elasticsearch backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`elasticsearch` is not available.
-
-        """
         super(ElasticsearchBackend, self).__init__(*args, **kwargs)
 
         if elasticsearch is None:
-            raise ImproperlyConfigured(ERR_LIB_MISSING)
-
-        uindex = udoc_type = uscheme = uhost = uport = None
-        
-        if url:
-            uscheme, uhost, uport, _, _, uuri, _ = _parse_url(url)  # noqa
-            uuri = uuri.strip('/') if uuri else None
-            uuris = uuri.split("/")
-            uindex = uuris[0] if len(uuris) > 0 else None
-            udoc_type = uuris[1] if len(uuris) > 1 else None
-
-        self.index = uindex or self.index
-        self.doc_type = udoc_type or self.doc_type
-        self.scheme = uscheme or self.scheme
-        self.host = uhost or self.host
-        self.port = uport or self.port
-
-        self._server = None
+            raise ImproperlyConfigured(E_LIB_MISSING)
 
+        index = doc_type = scheme = host = port = None
 
-    def _get_server(self):
-        """Connect to the Elasticsearch server."""
-        return elasticsearch.Elasticsearch(self.host)
-
+        if url:
+            scheme, host, port, _, _, path, _ = _parse_url(url)  # noqa
+            if path:
+                path = path.strip('/')
+                index, _, doc_type = path.partition('/')
 
-    @property
-    def server(self):
-        if self._server is None:
-            self._server = self._get_server()
-        return self._server
+        self.index = index or self.index
+        self.doc_type = doc_type or self.doc_type
+        self.scheme = scheme or self.scheme
+        self.host = host or self.host
+        self.port = port or self.port
 
+        self._server = None
 
     def get(self, key):
         try:
-            out = self.server.get(index=self.index,\
-                                  doc_type=self.doc_type,\
-                                  id=key)
-            if isinstance(out, dict) \
-                    and "found" in out and out["found"] \
-                    and "_source" in out and key in out["_source"]:
-                return out["_source"][key]
-            else:
-                return None
+            res = self.server.get(
+                index=self.index,
+                doc_type=self.doc_type,
+                id=key,
+            )
+            try:
+                if res['found']:
+                    return res['_source'][key]
+            except (TypeError, KeyError):
+                pass
         except elasticsearch.exceptions.NotFoundError:
-            return None
-
+            pass
 
     def set(self, key, value):
         try:
-            data = {}
-            data['@timestamp'] = "{0}Z".format(datetime.datetime.utcnow()\
-                                       .strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
-            data[key] = value
-            self.server.index(index=self.index, doc_type=self.doc_type,\
-                              id=key, body=data)
+            self._index(
+                id=key,
+                body={
+                    key: value,
+                    '@timestamp': '{0}Z'.format(
+                        datetime.utcnow().isoformat()[:-3]
+                    ),
+                },
+            )
         except elasticsearch.exceptions.ConflictError:
             # document already exists, update it
             data = self.get(key)
             data[key] = value
-            self.server.index(index=self.index, doc_type=self.doc_type,\
-                              id=key, body=data, refresh=True)
+            self._index(key, data, refresh=True)
 
+    def _index(self, id, body, **kwargs):
+        return self.server.index(
+            index=self.index,
+            doc_type=self.doc_type,
+            **kwargs
+        )
 
     def mget(self, keys):
         return [self.get(key) for key in keys]
 
-
     def delete(self, key):
         self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
 
+    def _get_server(self):
+        """Connect to the Elasticsearch server."""
+        return elasticsearch.Elasticsearch(self.host)
+
+    @property
+    def server(self):
+        if self._server is None:
+            self._server = self._get_server()
+        return self._server

+ 21 - 18
celery/tests/backends/test_elasticsearch.py

@@ -1,12 +1,11 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, unicode_literals
 
+from celery import backends
 from celery.backends import elasticsearch as module
 from celery.backends.elasticsearch import ElasticsearchBackend
 from celery.exceptions import ImproperlyConfigured
-from celery import backends
-from celery.tests.case import (
-    AppCase, Mock, SkipTest, sentinel,
-)
+
+from celery.tests.case import AppCase, Mock, SkipTest, sentinel
 
 try:
     import elasticsearch
@@ -16,13 +15,11 @@ except ImportError:
 
 class test_ElasticsearchBackend(AppCase):
 
-
     def setup(self):
         if elasticsearch is None:
             raise SkipTest('elasticsearch is not installed.')
         self.backend = ElasticsearchBackend(app=self.app)
 
-
     def test_init_no_elasticsearch(self):
         prev, module.elasticsearch = module.elasticsearch, None
         try:
@@ -31,7 +28,6 @@ class test_ElasticsearchBackend(AppCase):
         finally:
             module.elasticsearch = prev
 
-
     def test_get(self):
         x = ElasticsearchBackend(app=self.app)
         x._server = Mock()
@@ -42,19 +38,25 @@ class test_ElasticsearchBackend(AppCase):
         dict_result = x.get(sentinel.task_id)
 
         self.assertEqual(dict_result, sentinel.result)
-        x._server.get.assert_called_once_with(doc_type=x.doc_type, id=sentinel.task_id, index=x.index)
-
+        x._server.get.assert_called_once_with(
+            doc_type=x.doc_type,
+            id=sentinel.task_id,
+            index=x.index,
+        )
 
     def test_get_none(self):
         x = ElasticsearchBackend(app=self.app)
         x._server = Mock()
         x._server.get = Mock()
         x._server.get.return_value = sentinel.result
-        none_reusult = x.get(sentinel.task_id)
-
-        self.assertEqual(none_reusult, None)
-        x._server.get.assert_called_once_with(doc_type=x.doc_type, id=sentinel.task_id, index=x.index)
+        none_result = x.get(sentinel.task_id)
 
+        self.assertEqual(none_result, None)
+        x._server.get.assert_called_once_with(
+            doc_type=x.doc_type,
+            id=sentinel.task_id,
+            index=x.index,
+        )
 
     def test_delete(self):
         x = ElasticsearchBackend(app=self.app)
@@ -63,8 +65,11 @@ class test_ElasticsearchBackend(AppCase):
         x._server.delete.return_value = sentinel.result
 
         self.assertIsNone(x.delete(sentinel.task_id), sentinel.result)
-        x._server.delete.assert_called_once_with(doc_type=x.doc_type, id=sentinel.task_id, index=x.index)
-
+        x._server.delete.assert_called_once_with(
+            doc_type=x.doc_type,
+            id=sentinel.task_id,
+            index=x.index,
+        )
 
     def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):
         backend, url_ = backends.get_backend_by_url(url, self.app.loader)
@@ -72,7 +77,6 @@ class test_ElasticsearchBackend(AppCase):
         self.assertIs(backend, ElasticsearchBackend)
         self.assertEqual(url_, url)
 
-
     def test_backend_params_by_url(self):
         url = 'elasticsearch://localhost:9200/index/doc_type'
         with self.Celery(backend=url) as app:
@@ -83,4 +87,3 @@ class test_ElasticsearchBackend(AppCase):
             self.assertEqual(x.scheme, 'elasticsearch')
             self.assertEqual(x.host, 'localhost')
             self.assertEqual(x.port, 9200)
-

+ 20 - 0
docs/configuration.rst

@@ -511,6 +511,10 @@ Can be one of the following:
     Use `Cassandra`_ to store the results.
     See :ref:`conf-cassandra-result-backend`.
 
+* elasticsearch
+    Use `Elasticsearch`_ to store the results.
+    See :ref:`conf-elasticsearch-result-backend`.
+
 * ironcache
     Use `IronCache`_ to store the results.
     See :ref:`conf-ironcache-result-backend`.
@@ -541,6 +545,7 @@ Can be one of the following:
 .. _`MongoDB`: http://mongodb.org
 .. _`Redis`: http://redis.io
 .. _`Cassandra`: http://cassandra.apache.org/
+.. _`Elasticsearch`: https://aws.amazon.com/elasticsearch-service/
 .. _`IronCache`: http://www.iron.io/cache
 .. _`CouchDB`: http://www.couchdb.com/
 .. _`Couchbase`: http://www.couchbase.com/
@@ -1002,6 +1007,21 @@ Example configuration
     cassandra_write_consistency = 'ONE'
     cassandra_entry_ttl = 86400
 
+.. _conf-elasticsearch-result-backend:
+
+Elasticsearch backend settings
+------------------------------
+
+To use `Elasticsearch`_ as the result backend you simply need to
+configure the :setting:`result_backend` setting with the correct URL.
+
+Example configuration
+~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: python
+
+    result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
+
 .. _conf-riak-result-backend:
 
 Riak backend settings

+ 1 - 1
docs/getting-started/introduction.rst

@@ -134,7 +134,7 @@ Celery is…
             - AMQP, Redis
             - memcached, MongoDB
             - SQLAlchemy, Django ORM
-            - Apache Cassandra
+            - Apache Cassandra, IronCache, Elasticsearch
 
         - **Serialization**
 

+ 3 - 0
docs/includes/installation.txt

@@ -86,6 +86,9 @@ Transports and Backends
 :celery[couchbase]:
     for using CouchBase as a result backend.
 
+:celery[elasticsearch]
+    for using Elasticsearch as a result backend.
+
 :celery[riak]:
     for using Riak as a result backend.
 

+ 1 - 1
docs/includes/introduction.txt

@@ -133,7 +133,7 @@ It supports…
         - AMQP, Redis
         - memcached, MongoDB
         - SQLAlchemy, Django ORM
-        - Apache Cassandra, IronCache
+        - Apache Cassandra, IronCache, Elasticsearch
 
     - **Serialization**
 

+ 11 - 0
docs/internals/reference/celery.backends.elasticsearch.txt

@@ -0,0 +1,11 @@
+===========================================
+ celery.backends.elasticsearch
+===========================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.backends.elasticsearch
+
+.. automodule:: celery.backends.elasticsearch
+    :members:
+    :undoc-members:

+ 1 - 0
docs/internals/reference/index.rst

@@ -30,6 +30,7 @@
     celery.backends.cache
     celery.backends.couchdb
     celery.backends.mongodb
+    celery.backends.elasticsearch
     celery.backends.redis
     celery.backends.riak
     celery.backends.cassandra

+ 1 - 1
setup.py

@@ -196,7 +196,7 @@ def extras(*p):
 
 # Celery specific
 features = set([
-    'auth', 'cassandra', 'memcache', 'couchbase', 'threads',
+    'auth', 'cassandra', 'elasticsearch', 'memcache', 'couchbase', 'threads',
     'eventlet', 'gevent', 'msgpack', 'yaml', 'redis',
     'mongodb', 'sqs', 'couchdb', 'riak', 'beanstalk', 'zookeeper',
     'zeromq', 'sqlalchemy', 'librabbitmq', 'pyro', 'slmq',