elasticsearch.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # -* coding: utf-8 -*-
  2. """Elasticsearch result store backend."""
  3. from __future__ import absolute_import, unicode_literals
  4. from datetime import datetime
  5. from kombu.utils.url import _parse_url
  6. from celery.exceptions import ImproperlyConfigured
  7. from .base import KeyValueStoreBackend
  8. try:
  9. import elasticsearch
  10. except ImportError:
  11. elasticsearch = None # noqa
  12. __all__ = ['ElasticsearchBackend']
  13. E_LIB_MISSING = """\
  14. You need to install the elasticsearch library to use the Elasticsearch \
  15. result backend.\
  16. """
  17. class ElasticsearchBackend(KeyValueStoreBackend):
  18. """Elasticsearch Backend.
  19. Raises:
  20. celery.exceptions.ImproperlyConfigured:
  21. if module :pypi:`elasticsearch` is not available.
  22. """
  23. index = 'celery'
  24. doc_type = 'backend'
  25. scheme = 'http'
  26. host = 'localhost'
  27. port = 9200
  28. def __init__(self, url=None, *args, **kwargs):
  29. super(ElasticsearchBackend, self).__init__(*args, **kwargs)
  30. self.url = url
  31. if elasticsearch is None:
  32. raise ImproperlyConfigured(E_LIB_MISSING)
  33. index = doc_type = scheme = host = port = None
  34. if url:
  35. scheme, host, port, _, _, path, _ = _parse_url(url) # noqa
  36. if path:
  37. path = path.strip('/')
  38. index, _, doc_type = path.partition('/')
  39. self.index = index or self.index
  40. self.doc_type = doc_type or self.doc_type
  41. self.scheme = scheme or self.scheme
  42. self.host = host or self.host
  43. self.port = port or self.port
  44. self._server = None
  45. def get(self, key):
  46. try:
  47. res = self.server.get(
  48. index=self.index,
  49. doc_type=self.doc_type,
  50. id=key,
  51. )
  52. try:
  53. if res['found']:
  54. return res['_source']['result']
  55. except (TypeError, KeyError):
  56. pass
  57. except elasticsearch.exceptions.NotFoundError:
  58. pass
  59. def set(self, key, value):
  60. try:
  61. self._index(
  62. id=key,
  63. body={
  64. 'result': value,
  65. '@timestamp': '{0}Z'.format(
  66. datetime.utcnow().isoformat()[:-3]
  67. ),
  68. },
  69. )
  70. except elasticsearch.exceptions.ConflictError:
  71. # document already exists, update it
  72. data = self.get(key)
  73. data[key] = value
  74. self._index(key, data, refresh=True)
  75. def _index(self, id, body, **kwargs):
  76. return self.server.index(
  77. id=id,
  78. index=self.index,
  79. doc_type=self.doc_type,
  80. body=body,
  81. **kwargs
  82. )
  83. def mget(self, keys):
  84. return [self.get(key) for key in keys]
  85. def delete(self, key):
  86. self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
  87. def _get_server(self):
  88. """Connect to the Elasticsearch server."""
  89. return elasticsearch.Elasticsearch('%s:%s' % (self.host, self.port))
  90. @property
  91. def server(self):
  92. if self._server is None:
  93. self._server = self._get_server()
  94. return self._server