test_consul.py 979 B

12345678910111213141516171819202122232425262728293031
  1. from __future__ import absolute_import, unicode_literals
  2. from case import Mock, skip
  3. from celery.backends.consul import ConsulBackend
  4. @skip.unless_module('consul')
  5. class test_ConsulBackend:
  6. def setup(self):
  7. self.backend = ConsulBackend(
  8. app=self.app, url='consul://localhost:800')
  9. def test_supports_autoexpire(self):
  10. assert self.backend.supports_autoexpire
  11. def test_consul_consistency(self):
  12. assert self.backend.consistency == 'consistent'
  13. def test_get(self):
  14. index = 100
  15. data = {'Key': 'test-consul-1', 'Value': 'mypayload'}
  16. self.backend.client = Mock(name='c.client')
  17. self.backend.client.kv.get.return_value = (index, data)
  18. assert self.backend.get(data['Key']) == 'mypayload'
  19. def test_index_bytes_key(self):
  20. key = 'test-consul-2'
  21. assert self.backend._key_to_consul_key(key) == key
  22. assert self.backend._key_to_consul_key(key.encode('utf-8')) == key