test_elasticsearch.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, sentinel, skip
  4. from celery.app import backends
  5. from celery.five import string
  6. from celery.backends import elasticsearch as module
  7. from celery.backends.elasticsearch import ElasticsearchBackend
  8. from celery.exceptions import ImproperlyConfigured
  9. @skip.unless_module('elasticsearch')
  10. class test_ElasticsearchBackend:
  11. def setup(self):
  12. self.backend = ElasticsearchBackend(app=self.app)
  13. def test_init_no_elasticsearch(self):
  14. prev, module.elasticsearch = module.elasticsearch, None
  15. try:
  16. with pytest.raises(ImproperlyConfigured):
  17. ElasticsearchBackend(app=self.app)
  18. finally:
  19. module.elasticsearch = prev
  20. def test_get(self):
  21. x = ElasticsearchBackend(app=self.app)
  22. x._server = Mock()
  23. x._server.get = Mock()
  24. # expected result
  25. r = dict(found=True, _source={'result': sentinel.result})
  26. x._server.get.return_value = r
  27. dict_result = x.get(sentinel.task_id)
  28. assert dict_result == sentinel.result
  29. x._server.get.assert_called_once_with(
  30. doc_type=x.doc_type,
  31. id=sentinel.task_id,
  32. index=x.index,
  33. )
  34. def test_get_none(self):
  35. x = ElasticsearchBackend(app=self.app)
  36. x._server = Mock()
  37. x._server.get = Mock()
  38. x._server.get.return_value = sentinel.result
  39. none_result = x.get(sentinel.task_id)
  40. assert none_result is None
  41. x._server.get.assert_called_once_with(
  42. doc_type=x.doc_type,
  43. id=sentinel.task_id,
  44. index=x.index,
  45. )
  46. def test_delete(self):
  47. x = ElasticsearchBackend(app=self.app)
  48. x._server = Mock()
  49. x._server.delete = Mock()
  50. x._server.delete.return_value = sentinel.result
  51. assert x.delete(sentinel.task_id) is None
  52. x._server.delete.assert_called_once_with(
  53. doc_type=x.doc_type,
  54. id=sentinel.task_id,
  55. index=x.index,
  56. )
  57. def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):
  58. backend, url_ = backends.by_url(url, self.app.loader)
  59. assert backend is ElasticsearchBackend
  60. assert url_ == url
  61. def test_backend_params_by_url(self):
  62. url = 'elasticsearch://localhost:9200/index/doc_type'
  63. with self.Celery(backend=url) as app:
  64. x = app.backend
  65. assert x.index == 'index'
  66. assert x.doc_type == 'doc_type'
  67. assert x.scheme == 'elasticsearch'
  68. assert x.host == 'localhost'
  69. assert x.port == 9200
  70. def test_index(self):
  71. x = ElasticsearchBackend(app=self.app)
  72. x.doc_type = 'test-doc-type'
  73. x._server = Mock()
  74. x._server.index = Mock()
  75. expected_result = dict(
  76. _id=sentinel.task_id,
  77. _source={'result': sentinel.result}
  78. )
  79. x._server.index.return_value = expected_result
  80. body = {"field1": "value1"}
  81. x._index(id=sentinel.task_id, body=body, kwarg1='test1')
  82. x._server.index.assert_called_once_with(
  83. id=string(sentinel.task_id),
  84. doc_type=x.doc_type,
  85. index=x.index,
  86. body=body,
  87. kwarg1='test1'
  88. )
  89. def test_config_params(self):
  90. self.app.conf.elasticsearch_max_retries = 10
  91. self.app.conf.elasticsearch_timeout = 20.0
  92. self.app.conf.elasticsearch_retry_on_timeout = True
  93. self.backend = ElasticsearchBackend(app=self.app)
  94. assert self.backend.es_max_retries == 10
  95. assert self.backend.es_timeout == 20.0
  96. assert self.backend.es_retry_on_timeout is True