| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 | from __future__ import absolute_import, unicode_literalsimport pytestfrom case import Mock, sentinel, skipfrom celery.app import backendsfrom celery.backends import elasticsearch as modulefrom celery.backends.elasticsearch import ElasticsearchBackendfrom celery.exceptions import ImproperlyConfigured@skip.unless_module('elasticsearch')class test_ElasticsearchBackend:    def setup(self):        self.backend = ElasticsearchBackend(app=self.app)    def test_init_no_elasticsearch(self):        prev, module.elasticsearch = module.elasticsearch, None        try:            with pytest.raises(ImproperlyConfigured):                ElasticsearchBackend(app=self.app)        finally:            module.elasticsearch = prev    def test_get(self):        x = ElasticsearchBackend(app=self.app)        x._server = Mock()        x._server.get = Mock()        # expected result        r = {'found': True, '_source': {'result': sentinel.result}}        x._server.get.return_value = r        dict_result = x.get(sentinel.task_id)        assert dict_result == sentinel.result        x._server.get.assert_called_once_with(            doc_type=x.doc_type,            id=sentinel.task_id,            index=x.index,        )    def test_get_none(self):        x = ElasticsearchBackend(app=self.app)        x._server = Mock()        x._server.get = Mock()        x._server.get.return_value = sentinel.result        none_result = x.get(sentinel.task_id)        assert none_result is None        x._server.get.assert_called_once_with(            doc_type=x.doc_type,            id=sentinel.task_id,            index=x.index,        )    def test_delete(self):        x = ElasticsearchBackend(app=self.app)        x._server = Mock()        x._server.delete = Mock()        x._server.delete.return_value = sentinel.result        assert x.delete(sentinel.task_id) is None        x._server.delete.assert_called_once_with(            doc_type=x.doc_type,            id=sentinel.task_id,            index=x.index,        )    def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):        backend, url_ = backends.by_url(url, self.app.loader)        assert backend is ElasticsearchBackend        assert url_ == url    def test_backend_params_by_url(self):        url = 'elasticsearch://localhost:9200/index/doc_type'        with self.Celery(backend=url) as app:            x = app.backend            assert x.index == 'index'            assert x.doc_type == 'doc_type'            assert x.scheme == 'elasticsearch'            assert x.host == 'localhost'            assert x.port == 9200    def test_index(self):        x = ElasticsearchBackend(app=self.app)        x.doc_type = 'test-doc-type'        x._server = Mock()        x._server.index = Mock()        expected_result = {            '_id': sentinel.task_id,            '_source': {'result': sentinel.result}        }        x._server.index.return_value = expected_result        body = {"field1": "value1"}        x._index(            id=str(sentinel.task_id).encode(),            body=body,            kwarg1='test1'        )        x._server.index.assert_called_once_with(            id=str(sentinel.task_id),            doc_type=x.doc_type,            index=x.index,            body=body,            kwarg1='test1'        )    def test_index_bytes_key(self):        x = ElasticsearchBackend(app=self.app)        x.doc_type = 'test-doc-type'        x._server = Mock()        x._server.index = Mock()        expected_result = {            '_id': sentinel.task_id,            '_source': {'result': sentinel.result}        }        x._server.index.return_value = expected_result        body = {b"field1": "value1"}        x._index(            id=str(sentinel.task_id).encode(),            body=body,            kwarg1='test1'        )        x._server.index.assert_called_once_with(            id=str(sentinel.task_id),            doc_type=x.doc_type,            index=x.index,            body={"field1": "value1"},            kwarg1='test1'        )    def test_config_params(self):        self.app.conf.elasticsearch_max_retries = 10        self.app.conf.elasticsearch_timeout = 20.0        self.app.conf.elasticsearch_retry_on_timeout = True        self.backend = ElasticsearchBackend(app=self.app)        assert self.backend.es_max_retries == 10        assert self.backend.es_timeout == 20.0        assert self.backend.es_retry_on_timeout is True
 |