test_consul.py 719 B

1234567891011121314151617181920212223
  1. from case import Mock, skip
  2. from celery.backends.consul import ConsulBackend
  3. @skip.unless_module('consul')
  4. class test_ConsulBackend:
  5. def setup(self):
  6. self.backend = ConsulBackend(
  7. app=self.app, url='consul://localhost:800')
  8. def test_supports_autoexpire(self):
  9. assert self.backend.supports_autoexpire
  10. def test_consul_consistency(self):
  11. assert self.backend.consistency == 'consistent'
  12. def test_get(self):
  13. index = 100
  14. data = {'Key': 'test-consul-1', 'Value': 'mypayload'}
  15. self.backend.client = Mock(name='c.client')
  16. self.backend.client.kv.get.return_value = (index, data)
  17. assert self.backend.get(data['Key']) == 'mypayload'