test_elasticsearch.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, sentinel, skip
  4. from celery import backends
  5. from celery.backends import elasticsearch as module
  6. from celery.backends.elasticsearch import ElasticsearchBackend
  7. from celery.exceptions import ImproperlyConfigured
  8. @skip.unless_module('elasticsearch')
  9. class test_ElasticsearchBackend:
  10. def setup(self):
  11. self.backend = ElasticsearchBackend(app=self.app)
  12. def test_init_no_elasticsearch(self):
  13. prev, module.elasticsearch = module.elasticsearch, None
  14. try:
  15. with pytest.raises(ImproperlyConfigured):
  16. ElasticsearchBackend(app=self.app)
  17. finally:
  18. module.elasticsearch = prev
  19. def test_get(self):
  20. x = ElasticsearchBackend(app=self.app)
  21. x._server = Mock()
  22. x._server.get = Mock()
  23. # expected result
  24. r = dict(found=True, _source={sentinel.task_id: sentinel.result})
  25. x._server.get.return_value = r
  26. dict_result = x.get(sentinel.task_id)
  27. assert dict_result == sentinel.result
  28. x._server.get.assert_called_once_with(
  29. doc_type=x.doc_type,
  30. id=sentinel.task_id,
  31. index=x.index,
  32. )
  33. def test_get_none(self):
  34. x = ElasticsearchBackend(app=self.app)
  35. x._server = Mock()
  36. x._server.get = Mock()
  37. x._server.get.return_value = sentinel.result
  38. none_result = x.get(sentinel.task_id)
  39. assert none_result is None
  40. x._server.get.assert_called_once_with(
  41. doc_type=x.doc_type,
  42. id=sentinel.task_id,
  43. index=x.index,
  44. )
  45. def test_delete(self):
  46. x = ElasticsearchBackend(app=self.app)
  47. x._server = Mock()
  48. x._server.delete = Mock()
  49. x._server.delete.return_value = sentinel.result
  50. assert x.delete(sentinel.task_id) is None
  51. x._server.delete.assert_called_once_with(
  52. doc_type=x.doc_type,
  53. id=sentinel.task_id,
  54. index=x.index,
  55. )
  56. def test_backend_by_url(self, url='elasticsearch://localhost:9200/index'):
  57. backend, url_ = backends.get_backend_by_url(url, self.app.loader)
  58. assert backend is ElasticsearchBackend
  59. assert url_ == url
  60. def test_backend_params_by_url(self):
  61. url = 'elasticsearch://localhost:9200/index/doc_type'
  62. with self.Celery(backend=url) as app:
  63. x = app.backend
  64. assert x.index == 'index'
  65. assert x.doc_type == 'doc_type'
  66. assert x.scheme == 'elasticsearch'
  67. assert x.host == 'localhost'
  68. assert x.port == 9200