test_couchbase.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. """Tests for the CouchbaseBackend."""
  2. import pytest
  3. from kombu.utils.encoding import str_t
  4. from case import MagicMock, Mock, patch, sentinel, skip
  5. from celery.app import backends
  6. from celery.backends import couchbase as module
  7. from celery.backends.couchbase import CouchbaseBackend
  8. from celery.exceptions import ImproperlyConfigured
  9. try:
  10. import couchbase
  11. except ImportError:
  12. couchbase = None # noqa
  13. COUCHBASE_BUCKET = 'celery_bucket'
  14. @skip.unless_module('couchbase')
  15. class test_CouchbaseBackend:
  16. def setup(self):
  17. self.backend = CouchbaseBackend(app=self.app)
  18. def test_init_no_couchbase(self):
  19. prev, module.Couchbase = module.Couchbase, None
  20. try:
  21. with pytest.raises(ImproperlyConfigured):
  22. CouchbaseBackend(app=self.app)
  23. finally:
  24. module.Couchbase = prev
  25. def test_init_no_settings(self):
  26. self.app.conf.couchbase_backend_settings = []
  27. with pytest.raises(ImproperlyConfigured):
  28. CouchbaseBackend(app=self.app)
  29. def test_init_settings_is_None(self):
  30. self.app.conf.couchbase_backend_settings = None
  31. CouchbaseBackend(app=self.app)
  32. def test_get_connection_connection_exists(self):
  33. with patch('couchbase.connection.Connection') as mock_Connection:
  34. self.backend._connection = sentinel._connection
  35. connection = self.backend._get_connection()
  36. assert sentinel._connection == connection
  37. mock_Connection.assert_not_called()
  38. def test_get(self):
  39. self.app.conf.couchbase_backend_settings = {}
  40. x = CouchbaseBackend(app=self.app)
  41. x._connection = Mock()
  42. mocked_get = x._connection.get = Mock()
  43. mocked_get.return_value.value = sentinel.retval
  44. # should return None
  45. assert x.get('1f3fab') == sentinel.retval
  46. x._connection.get.assert_called_once_with('1f3fab')
  47. def test_set(self):
  48. self.app.conf.couchbase_backend_settings = None
  49. x = CouchbaseBackend(app=self.app)
  50. x._connection = MagicMock()
  51. x._connection.set = MagicMock()
  52. # should return None
  53. assert x.set(sentinel.key, sentinel.value) is None
  54. def test_delete(self):
  55. self.app.conf.couchbase_backend_settings = {}
  56. x = CouchbaseBackend(app=self.app)
  57. x._connection = Mock()
  58. mocked_delete = x._connection.delete = Mock()
  59. mocked_delete.return_value = None
  60. # should return None
  61. assert x.delete('1f3fab') is None
  62. x._connection.delete.assert_called_once_with('1f3fab')
  63. def test_config_params(self):
  64. self.app.conf.couchbase_backend_settings = {
  65. 'bucket': 'mycoolbucket',
  66. 'host': ['here.host.com', 'there.host.com'],
  67. 'username': 'johndoe',
  68. 'password': 'mysecret',
  69. 'port': '1234',
  70. }
  71. x = CouchbaseBackend(app=self.app)
  72. assert x.bucket == 'mycoolbucket'
  73. assert x.host == ['here.host.com', 'there.host.com']
  74. assert x.username == 'johndoe'
  75. assert x.password == 'mysecret'
  76. assert x.port == 1234
  77. def test_backend_by_url(self, url='couchbase://myhost/mycoolbucket'):
  78. from celery.backends.couchbase import CouchbaseBackend
  79. backend, url_ = backends.by_url(url, self.app.loader)
  80. assert backend is CouchbaseBackend
  81. assert url_ == url
  82. def test_backend_params_by_url(self):
  83. url = 'couchbase://johndoe:mysecret@myhost:123/mycoolbucket'
  84. with self.Celery(backend=url) as app:
  85. x = app.backend
  86. assert x.bucket == 'mycoolbucket'
  87. assert x.host == 'myhost'
  88. assert x.username == 'johndoe'
  89. assert x.password == 'mysecret'
  90. assert x.port == 123
  91. def test_correct_key_types(self):
  92. keys = [
  93. self.backend.get_key_for_task('task_id', bytes('key')),
  94. self.backend.get_key_for_chord('group_id', bytes('key')),
  95. self.backend.get_key_for_group('group_id', bytes('key')),
  96. self.backend.get_key_for_task('task_id', 'key'),
  97. self.backend.get_key_for_chord('group_id', 'key'),
  98. self.backend.get_key_for_group('group_id', 'key'),
  99. ]
  100. for key in keys:
  101. assert isinstance(key, str_t)