consul.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # -*- coding: utf-8 -*-
  2. """Consul result store backend.
  3. - :class:`ConsulBackend` implements KeyValueStoreBackend to store results
  4. in the key-value store of Consul.
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. from kombu.utils.url import parse_url
  8. from celery.exceptions import ImproperlyConfigured
  9. from celery.backends.base import KeyValueStoreBackend, PY3
  10. from celery.utils.log import get_logger
  11. try:
  12. import consul
  13. except ImportError:
  14. consul = None
  15. logger = get_logger(__name__)
  16. __all__ = ['ConsulBackend']
  17. CONSUL_MISSING = """\
  18. You need to install the python-consul library in order to use \
  19. the Consul result store backend."""
  20. class ConsulBackend(KeyValueStoreBackend):
  21. """Consul.io K/V store backend for Celery."""
  22. consul = consul
  23. supports_autoexpire = True
  24. client = None
  25. consistency = 'consistent'
  26. path = None
  27. def __init__(self, *args, **kwargs):
  28. super(ConsulBackend, self).__init__(*args, **kwargs)
  29. if self.consul is None:
  30. raise ImproperlyConfigured(CONSUL_MISSING)
  31. self._init_from_params(**parse_url(self.url))
  32. def _init_from_params(self, hostname, port, virtual_host, **params):
  33. logger.debug('Setting on Consul client to connect to %s:%d',
  34. hostname, port)
  35. self.path = virtual_host
  36. self.client = consul.Consul(host=hostname, port=port,
  37. consistency=self.consistency)
  38. def _key_to_consul_key(self, key):
  39. if PY3:
  40. key = key.encode('utf-8')
  41. return key if self.path is None else '{0}/{1}'.format(self.path, key)
  42. def get(self, key):
  43. key = self._key_to_consul_key(key)
  44. logger.debug('Trying to fetch key %s from Consul', key)
  45. try:
  46. _, data = self.client.kv.get(key)
  47. return data['Value']
  48. except TypeError:
  49. pass
  50. def mget(self, keys):
  51. for key in keys:
  52. yield self.get(key)
  53. def set(self, key, value):
  54. """Set a key in Consul.
  55. Before creating the key it will create a session inside Consul
  56. where it creates a session with a TTL
  57. The key created afterwards will reference to the session's ID.
  58. If the session expires it will remove the key so that results
  59. can auto expire from the K/V store
  60. """
  61. session_name = key
  62. if PY3:
  63. session_name = key.decode('utf-8')
  64. key = self._key_to_consul_key(key)
  65. logger.debug('Trying to create Consul session %s with TTL %d',
  66. session_name, self.expires)
  67. session_id = self.client.session.create(name=session_name,
  68. behavior='delete',
  69. ttl=self.expires)
  70. logger.debug('Created Consul session %s', session_id)
  71. logger.debug('Writing key %s to Consul', key)
  72. return self.client.kv.put(key=key,
  73. value=value,
  74. acquire=session_id)
  75. def delete(self, key):
  76. key = self._key_to_consul_key(key)
  77. logger.debug('Removing key %s from Consul', key)
  78. return self.client.kv.delete(key)