test_elasticsearch.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, sentinel, skip
  4. from celery.app import backends
  5. from celery.backends import elasticsearch as module
  6. from celery.backends.elasticsearch import ElasticsearchBackend
  7. from celery.exceptions import ImproperlyConfigured
  8. @skip.unless_module('elasticsearch')
  9. class test_ElasticsearchBackend:
  10. def setup(self):
  11. self.backend = ElasticsearchBackend(app=self.app)
  12. def test_init_no_elasticsearch(self):
  13. prev, module.elasticsearch = module.elasticsearch, None
  14. try:
  15. with pytest.raises(ImproperlyConfigured):
  16. ElasticsearchBackend(app=self.app)
  17. finally:
  18. module.elasticsearch = prev
  19. def test_get(self):
  20. x = ElasticsearchBackend(app=self.app)
  21. x._server = Mock()
  22. x._server.get = Mock()
  23. # expected result
  24. r = {'found': True, '_source': {'result': sentinel.result}}
  25. x._server.get.return_value = r
  26. dict_result = x.get(sentinel.task_id)
  27. assert dict_result == sentinel.result
  28. x._server.get.assert_called_once_with(
  29. doc_type=x.doc_type,
  30. id=sentinel.task_id,
  31. index=x.index,
  32. )
  33. def test_get_none(self):
  34. x = ElasticsearchBackend(app=self.app)
  35. x._server = Mock()
  36. x._server.get = Mock()
  37. x._server.get.return_value = sentinel.result
  38. none_result = x.get(sentinel.task_id)
  39. assert none_result is None
  40. x._server.get.assert_called_once_with(
  41. doc_type=x.doc_type,
  42. id=sentinel.task_id,
  43. index=x.index,
  44. )
  45. def test_delete(self):
  46. x = ElasticsearchBackend(app=self.app)
  47. x._server = Mock()
  48. x._server.delete = Mock()
  49. x._server.delete.return_value = sentinel.result
  50. assert x.delete(sentinel.task_id) is None
  51. x._server.delete.assert_called_once_with(
  52. doc_type=x.doc_type,
  53. id=sentinel.task_id,
  54. index=x.index,
  55. )
  56. def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):
  57. backend, url_ = backends.by_url(url, self.app.loader)
  58. assert backend is ElasticsearchBackend
  59. assert url_ == url
  60. def test_backend_params_by_url(self):
  61. url = 'elasticsearch://localhost:9200/index/doc_type'
  62. with self.Celery(backend=url) as app:
  63. x = app.backend
  64. assert x.index == 'index'
  65. assert x.doc_type == 'doc_type'
  66. assert x.scheme == 'elasticsearch'
  67. assert x.host == 'localhost'
  68. assert x.port == 9200
  69. def test_index(self):
  70. x = ElasticsearchBackend(app=self.app)
  71. x.doc_type = 'test-doc-type'
  72. x._server = Mock()
  73. x._server.index = Mock()
  74. expected_result = {
  75. '_id': sentinel.task_id,
  76. '_source': {'result': sentinel.result}
  77. }
  78. x._server.index.return_value = expected_result
  79. body = {"field1": "value1"}
  80. x._index(
  81. id=str(sentinel.task_id).encode(),
  82. body=body,
  83. kwarg1='test1'
  84. )
  85. x._server.index.assert_called_once_with(
  86. id=str(sentinel.task_id),
  87. doc_type=x.doc_type,
  88. index=x.index,
  89. body=body,
  90. kwarg1='test1'
  91. )
  92. def test_index_bytes_key(self):
  93. x = ElasticsearchBackend(app=self.app)
  94. x.doc_type = 'test-doc-type'
  95. x._server = Mock()
  96. x._server.index = Mock()
  97. expected_result = {
  98. '_id': sentinel.task_id,
  99. '_source': {'result': sentinel.result}
  100. }
  101. x._server.index.return_value = expected_result
  102. body = {b"field1": "value1"}
  103. x._index(
  104. id=str(sentinel.task_id).encode(),
  105. body=body,
  106. kwarg1='test1'
  107. )
  108. x._server.index.assert_called_once_with(
  109. id=str(sentinel.task_id),
  110. doc_type=x.doc_type,
  111. index=x.index,
  112. body={"field1": "value1"},
  113. kwarg1='test1'
  114. )
  115. def test_config_params(self):
  116. self.app.conf.elasticsearch_max_retries = 10
  117. self.app.conf.elasticsearch_timeout = 20.0
  118. self.app.conf.elasticsearch_retry_on_timeout = True
  119. self.backend = ElasticsearchBackend(app=self.app)
  120. assert self.backend.es_max_retries == 10
  121. assert self.backend.es_timeout == 20.0
  122. assert self.backend.es_retry_on_timeout is True