test_elasticsearch.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import pytest
  2. from case import Mock, sentinel, skip
  3. from celery.app import backends
  4. from celery.backends import elasticsearch as module
  5. from celery.backends.elasticsearch import ElasticsearchBackend
  6. from celery.exceptions import ImproperlyConfigured
  7. @skip.unless_module('elasticsearch')
  8. class test_ElasticsearchBackend:
  9. def setup(self):
  10. self.backend = ElasticsearchBackend(app=self.app)
  11. def test_init_no_elasticsearch(self):
  12. prev, module.elasticsearch = module.elasticsearch, None
  13. try:
  14. with pytest.raises(ImproperlyConfigured):
  15. ElasticsearchBackend(app=self.app)
  16. finally:
  17. module.elasticsearch = prev
  18. def test_get(self):
  19. x = ElasticsearchBackend(app=self.app)
  20. x._server = Mock()
  21. x._server.get = Mock()
  22. # expected result
  23. r = dict(found=True, _source={sentinel.task_id: sentinel.result})
  24. x._server.get.return_value = r
  25. dict_result = x.get(sentinel.task_id)
  26. assert dict_result == sentinel.result
  27. x._server.get.assert_called_once_with(
  28. doc_type=x.doc_type,
  29. id=sentinel.task_id,
  30. index=x.index,
  31. )
  32. def test_get_none(self):
  33. x = ElasticsearchBackend(app=self.app)
  34. x._server = Mock()
  35. x._server.get = Mock()
  36. x._server.get.return_value = sentinel.result
  37. none_result = x.get(sentinel.task_id)
  38. assert none_result is None
  39. x._server.get.assert_called_once_with(
  40. doc_type=x.doc_type,
  41. id=sentinel.task_id,
  42. index=x.index,
  43. )
  44. def test_delete(self):
  45. x = ElasticsearchBackend(app=self.app)
  46. x._server = Mock()
  47. x._server.delete = Mock()
  48. x._server.delete.return_value = sentinel.result
  49. assert x.delete(sentinel.task_id) is None
  50. x._server.delete.assert_called_once_with(
  51. doc_type=x.doc_type,
  52. id=sentinel.task_id,
  53. index=x.index,
  54. )
  55. def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):
  56. backend, url_ = backends.by_url(url, self.app.loader)
  57. assert backend is ElasticsearchBackend
  58. assert url_ == url
  59. def test_backend_params_by_url(self):
  60. url = 'elasticsearch://localhost:9200/index/doc_type'
  61. with self.Celery(backend=url) as app:
  62. x = app.backend
  63. assert x.index == 'index'
  64. assert x.doc_type == 'doc_type'
  65. assert x.scheme == 'elasticsearch'
  66. assert x.host == 'localhost'
  67. assert x.port == 9200