elasticsearch.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # -* coding: utf-8 -*-
  2. """Elasticsearch result store backend."""
  3. from __future__ import absolute_import, unicode_literals
  4. from datetime import datetime
  5. from kombu.utils.url import _parse_url
  6. from kombu.utils.encoding import bytes_to_str
  7. from celery.exceptions import ImproperlyConfigured
  8. from celery.five import items
  9. from .base import KeyValueStoreBackend
  10. try:
  11. import elasticsearch
  12. except ImportError:
  13. elasticsearch = None # noqa
  14. __all__ = ('ElasticsearchBackend',)
  15. E_LIB_MISSING = """\
  16. You need to install the elasticsearch library to use the Elasticsearch \
  17. result backend.\
  18. """
  19. class ElasticsearchBackend(KeyValueStoreBackend):
  20. """Elasticsearch Backend.
  21. Raises:
  22. celery.exceptions.ImproperlyConfigured:
  23. if module :pypi:`elasticsearch` is not available.
  24. """
  25. index = 'celery'
  26. doc_type = 'backend'
  27. scheme = 'http'
  28. host = 'localhost'
  29. port = 9200
  30. es_retry_on_timeout = False
  31. es_timeout = 10
  32. es_max_retries = 3
  33. def __init__(self, url=None, *args, **kwargs):
  34. super(ElasticsearchBackend, self).__init__(*args, **kwargs)
  35. self.url = url
  36. _get = self.app.conf.get
  37. if elasticsearch is None:
  38. raise ImproperlyConfigured(E_LIB_MISSING)
  39. index = doc_type = scheme = host = port = None
  40. if url:
  41. scheme, host, port, _, _, path, _ = _parse_url(url) # noqa
  42. if path:
  43. path = path.strip('/')
  44. index, _, doc_type = path.partition('/')
  45. self.index = index or self.index
  46. self.doc_type = doc_type or self.doc_type
  47. self.scheme = scheme or self.scheme
  48. self.host = host or self.host
  49. self.port = port or self.port
  50. self.es_retry_on_timeout = (
  51. _get('elasticsearch_retry_on_timeout') or self.es_retry_on_timeout
  52. )
  53. es_timeout = _get('elasticsearch_timeout')
  54. if es_timeout is not None:
  55. self.es_timeout = es_timeout
  56. es_max_retries = _get('elasticsearch_max_retries')
  57. if es_max_retries is not None:
  58. self.es_max_retries = es_max_retries
  59. self._server = None
  60. def get(self, key):
  61. try:
  62. res = self.server.get(
  63. index=self.index,
  64. doc_type=self.doc_type,
  65. id=key,
  66. )
  67. try:
  68. if res['found']:
  69. return res['_source']['result']
  70. except (TypeError, KeyError):
  71. pass
  72. except elasticsearch.exceptions.NotFoundError:
  73. pass
  74. def set(self, key, value):
  75. try:
  76. self._index(
  77. id=key,
  78. body={
  79. 'result': value,
  80. '@timestamp': '{0}Z'.format(
  81. datetime.utcnow().isoformat()[:-3]
  82. ),
  83. },
  84. )
  85. except elasticsearch.exceptions.ConflictError:
  86. # document already exists, update it
  87. data = self.get(key)
  88. data[key] = value
  89. self._index(key, data, refresh=True)
  90. def _index(self, id, body, **kwargs):
  91. body = {bytes_to_str(k): v for k, v in items(body)}
  92. return self.server.index(
  93. id=bytes_to_str(id),
  94. index=self.index,
  95. doc_type=self.doc_type,
  96. body=body,
  97. **kwargs
  98. )
  99. def mget(self, keys):
  100. return [self.get(key) for key in keys]
  101. def delete(self, key):
  102. self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
  103. def _get_server(self):
  104. """Connect to the Elasticsearch server."""
  105. return elasticsearch.Elasticsearch(
  106. '%s:%s' % (self.host, self.port),
  107. retry_on_timeout=self.es_retry_on_timeout,
  108. max_retries=self.es_max_retries,
  109. timeout=self.es_timeout
  110. )
  111. @property
  112. def server(self):
  113. if self._server is None:
  114. self._server = self._get_server()
  115. return self._server