소스 검색

elasticsearch: Fix serializing keys (#3924)

* elasticsearch: Fix serializing keys

elasticsearch requires keys to be a string, not bytes,
so make sure we convert the key to a string

* Address code review.

* Added a testcase that ensures byte keys are converted to strings.

* Fix.

* Use bytes_to_str instead of string for 2/3 compatibility

* Update unit test for bytes_to_str

* Remove five.string reference

* Correctly convert sentinel.task_id

* Convert sentinel object to bytes/str
Static 8 년 전
부모
커밋
8fba154e87
2개의 변경된 파일35개의 추가작업 그리고 5개의 파일을 삭제
  1. 4 2
      celery/backends/elasticsearch.py
  2. 31 3
      t/unit/backends/test_elasticsearch.py

+ 4 - 2
celery/backends/elasticsearch.py

@@ -3,8 +3,9 @@
 from __future__ import absolute_import, unicode_literals
 from __future__ import absolute_import, unicode_literals
 from datetime import datetime
 from datetime import datetime
 from kombu.utils.url import _parse_url
 from kombu.utils.url import _parse_url
+from kombu.utils.encoding import bytes_to_str
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
-from celery.five import string
+from celery.five import items
 from .base import KeyValueStoreBackend
 from .base import KeyValueStoreBackend
 try:
 try:
     import elasticsearch
     import elasticsearch
@@ -105,8 +106,9 @@ class ElasticsearchBackend(KeyValueStoreBackend):
             self._index(key, data, refresh=True)
             self._index(key, data, refresh=True)
 
 
     def _index(self, id, body, **kwargs):
     def _index(self, id, body, **kwargs):
+        body = {bytes_to_str(k): v for k, v in items(body)}
         return self.server.index(
         return self.server.index(
-            id=string(id),
+            id=bytes_to_str(id),
             index=self.index,
             index=self.index,
             doc_type=self.doc_type,
             doc_type=self.doc_type,
             body=body,
             body=body,

+ 31 - 3
t/unit/backends/test_elasticsearch.py

@@ -2,7 +2,6 @@ from __future__ import absolute_import, unicode_literals
 import pytest
 import pytest
 from case import Mock, sentinel, skip
 from case import Mock, sentinel, skip
 from celery.app import backends
 from celery.app import backends
-from celery.five import string
 from celery.backends import elasticsearch as module
 from celery.backends import elasticsearch as module
 from celery.backends.elasticsearch import ElasticsearchBackend
 from celery.backends.elasticsearch import ElasticsearchBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
@@ -94,15 +93,44 @@ class test_ElasticsearchBackend:
         x._server.index.return_value = expected_result
         x._server.index.return_value = expected_result
 
 
         body = {"field1": "value1"}
         body = {"field1": "value1"}
-        x._index(id=sentinel.task_id, body=body, kwarg1='test1')
+        x._index(
+            id=str(sentinel.task_id).encode(),
+            body=body,
+            kwarg1='test1'
+        )
         x._server.index.assert_called_once_with(
         x._server.index.assert_called_once_with(
-            id=string(sentinel.task_id),
+            id=str(sentinel.task_id),
             doc_type=x.doc_type,
             doc_type=x.doc_type,
             index=x.index,
             index=x.index,
             body=body,
             body=body,
             kwarg1='test1'
             kwarg1='test1'
         )
         )
 
 
+    def test_index_bytes_key(self):
+        x = ElasticsearchBackend(app=self.app)
+        x.doc_type = 'test-doc-type'
+        x._server = Mock()
+        x._server.index = Mock()
+        expected_result = dict(
+            _id=sentinel.task_id,
+            _source={'result': sentinel.result}
+        )
+        x._server.index.return_value = expected_result
+
+        body = {b"field1": "value1"}
+        x._index(
+            id=str(sentinel.task_id).encode(),
+            body=body,
+            kwarg1='test1'
+        )
+        x._server.index.assert_called_once_with(
+            id=str(sentinel.task_id),
+            doc_type=x.doc_type,
+            index=x.index,
+            body={"field1": "value1"},
+            kwarg1='test1'
+        )
+
     def test_config_params(self):
     def test_config_params(self):
         self.app.conf.elasticsearch_max_retries = 10
         self.app.conf.elasticsearch_max_retries = 10
         self.app.conf.elasticsearch_timeout = 20.0
         self.app.conf.elasticsearch_timeout = 20.0