elasticsearch.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -* coding: utf-8 -*-
  2. """Elasticsearch result store backend."""
  3. from __future__ import absolute_import, unicode_literals
  4. from datetime import datetime
  5. from kombu.utils.url import _parse_url
  6. from celery.exceptions import ImproperlyConfigured
  7. from celery.five import string
  8. from .base import KeyValueStoreBackend
  9. try:
  10. import elasticsearch
  11. except ImportError:
  12. elasticsearch = None # noqa
  13. __all__ = ['ElasticsearchBackend']
  14. E_LIB_MISSING = """\
  15. You need to install the elasticsearch library to use the Elasticsearch \
  16. result backend.\
  17. """
  18. class ElasticsearchBackend(KeyValueStoreBackend):
  19. """Elasticsearch Backend.
  20. Raises:
  21. celery.exceptions.ImproperlyConfigured:
  22. if module :pypi:`elasticsearch` is not available.
  23. """
  24. index = 'celery'
  25. doc_type = 'backend'
  26. scheme = 'http'
  27. host = 'localhost'
  28. port = 9200
  29. es_retry_on_timeout = False
  30. es_timeout = 10
  31. es_max_retries = 3
  32. def __init__(self, url=None, *args, **kwargs):
  33. super(ElasticsearchBackend, self).__init__(*args, **kwargs)
  34. self.url = url
  35. _get = self.app.conf.get
  36. if elasticsearch is None:
  37. raise ImproperlyConfigured(E_LIB_MISSING)
  38. index = doc_type = scheme = host = port = None
  39. if url:
  40. scheme, host, port, _, _, path, _ = _parse_url(url) # noqa
  41. if path:
  42. path = path.strip('/')
  43. index, _, doc_type = path.partition('/')
  44. self.index = index or self.index
  45. self.doc_type = doc_type or self.doc_type
  46. self.scheme = scheme or self.scheme
  47. self.host = host or self.host
  48. self.port = port or self.port
  49. self.es_retry_on_timeout = (
  50. _get('elasticsearch_retry_on_timeout') or self.es_retry_on_timeout
  51. )
  52. es_timeout = _get('elasticsearch_timeout')
  53. if es_timeout is not None:
  54. self.es_timeout = es_timeout
  55. es_max_retries = _get('elasticsearch_max_retries')
  56. if es_max_retries is not None:
  57. self.es_max_retries = es_max_retries
  58. self._server = None
  59. def get(self, key):
  60. try:
  61. res = self.server.get(
  62. index=self.index,
  63. doc_type=self.doc_type,
  64. id=key,
  65. )
  66. try:
  67. if res['found']:
  68. return res['_source']['result']
  69. except (TypeError, KeyError):
  70. pass
  71. except elasticsearch.exceptions.NotFoundError:
  72. pass
  73. def set(self, key, value):
  74. try:
  75. self._index(
  76. id=key,
  77. body={
  78. 'result': value,
  79. '@timestamp': '{0}Z'.format(
  80. datetime.utcnow().isoformat()[:-3]
  81. ),
  82. },
  83. )
  84. except elasticsearch.exceptions.ConflictError:
  85. # document already exists, update it
  86. data = self.get(key)
  87. data[key] = value
  88. self._index(key, data, refresh=True)
  89. def _index(self, id, body, **kwargs):
  90. return self.server.index(
  91. id=string(id),
  92. index=self.index,
  93. doc_type=self.doc_type,
  94. body=body,
  95. **kwargs
  96. )
  97. def mget(self, keys):
  98. return [self.get(key) for key in keys]
  99. def delete(self, key):
  100. self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
  101. def _get_server(self):
  102. """Connect to the Elasticsearch server."""
  103. return elasticsearch.Elasticsearch(
  104. '%s:%s' % (self.host, self.port),
  105. retry_on_timeout=self.es_retry_on_timeout,
  106. max_retries=self.es_max_retries,
  107. timeout=self.es_timeout
  108. )
  109. @property
  110. def server(self):
  111. if self._server is None:
  112. self._server = self._get_server()
  113. return self._server