test_couchbase.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """Tests for the CouchbaseBackend."""
  2. from __future__ import absolute_import, unicode_literals
  3. import pytest
  4. from kombu.utils.encoding import str_t
  5. from case import MagicMock, Mock, patch, sentinel, skip
  6. from celery.app import backends
  7. from celery.backends import couchbase as module
  8. from celery.backends.couchbase import CouchbaseBackend
  9. from celery.exceptions import ImproperlyConfigured
  10. try:
  11. import couchbase
  12. except ImportError:
  13. couchbase = None # noqa
  14. COUCHBASE_BUCKET = 'celery_bucket'
  15. @skip.unless_module('couchbase')
  16. class test_CouchbaseBackend:
  17. def setup(self):
  18. self.backend = CouchbaseBackend(app=self.app)
  19. def test_init_no_couchbase(self):
  20. prev, module.Couchbase = module.Couchbase, None
  21. try:
  22. with pytest.raises(ImproperlyConfigured):
  23. CouchbaseBackend(app=self.app)
  24. finally:
  25. module.Couchbase = prev
  26. def test_init_no_settings(self):
  27. self.app.conf.couchbase_backend_settings = []
  28. with pytest.raises(ImproperlyConfigured):
  29. CouchbaseBackend(app=self.app)
  30. def test_init_settings_is_None(self):
  31. self.app.conf.couchbase_backend_settings = None
  32. CouchbaseBackend(app=self.app)
  33. def test_get_connection_connection_exists(self):
  34. with patch('couchbase.connection.Connection') as mock_Connection:
  35. self.backend._connection = sentinel._connection
  36. connection = self.backend._get_connection()
  37. assert sentinel._connection == connection
  38. mock_Connection.assert_not_called()
  39. def test_get(self):
  40. self.app.conf.couchbase_backend_settings = {}
  41. x = CouchbaseBackend(app=self.app)
  42. x._connection = Mock()
  43. mocked_get = x._connection.get = Mock()
  44. mocked_get.return_value.value = sentinel.retval
  45. # should return None
  46. assert x.get('1f3fab') == sentinel.retval
  47. x._connection.get.assert_called_once_with('1f3fab')
  48. def test_set(self):
  49. self.app.conf.couchbase_backend_settings = None
  50. x = CouchbaseBackend(app=self.app)
  51. x._connection = MagicMock()
  52. x._connection.set = MagicMock()
  53. # should return None
  54. assert x.set(sentinel.key, sentinel.value) is None
  55. def test_delete(self):
  56. self.app.conf.couchbase_backend_settings = {}
  57. x = CouchbaseBackend(app=self.app)
  58. x._connection = Mock()
  59. mocked_delete = x._connection.delete = Mock()
  60. mocked_delete.return_value = None
  61. # should return None
  62. assert x.delete('1f3fab') is None
  63. x._connection.delete.assert_called_once_with('1f3fab')
  64. def test_config_params(self):
  65. self.app.conf.couchbase_backend_settings = {
  66. 'bucket': 'mycoolbucket',
  67. 'host': ['here.host.com', 'there.host.com'],
  68. 'username': 'johndoe',
  69. 'password': 'mysecret',
  70. 'port': '1234',
  71. }
  72. x = CouchbaseBackend(app=self.app)
  73. assert x.bucket == 'mycoolbucket'
  74. assert x.host == ['here.host.com', 'there.host.com']
  75. assert x.username == 'johndoe'
  76. assert x.password == 'mysecret'
  77. assert x.port == 1234
  78. def test_backend_by_url(self, url='couchbase://myhost/mycoolbucket'):
  79. from celery.backends.couchbase import CouchbaseBackend
  80. backend, url_ = backends.by_url(url, self.app.loader)
  81. assert backend is CouchbaseBackend
  82. assert url_ == url
  83. def test_backend_params_by_url(self):
  84. url = 'couchbase://johndoe:mysecret@myhost:123/mycoolbucket'
  85. with self.Celery(backend=url) as app:
  86. x = app.backend
  87. assert x.bucket == 'mycoolbucket'
  88. assert x.host == 'myhost'
  89. assert x.username == 'johndoe'
  90. assert x.password == 'mysecret'
  91. assert x.port == 123
  92. def test_correct_key_types(self):
  93. keys = [
  94. self.backend.get_key_for_task('task_id', bytes('key')),
  95. self.backend.get_key_for_chord('group_id', bytes('key')),
  96. self.backend.get_key_for_group('group_id', bytes('key')),
  97. self.backend.get_key_for_task('task_id', 'key'),
  98. self.backend.get_key_for_chord('group_id', 'key'),
  99. self.backend.get_key_for_group('group_id', 'key'),
  100. ]
  101. for key in keys:
  102. assert isinstance(key, str_t)