test_couchbase.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. """Tests for the CouchbaseBackend."""
  2. from __future__ import absolute_import, unicode_literals
  3. import pytest
  4. from case import MagicMock, Mock, patch, sentinel, skip
  5. from kombu.utils.encoding import str_t
  6. from celery.app import backends
  7. from celery.backends import couchbase as module
  8. from celery.backends.couchbase import CouchbaseBackend
  9. from celery.exceptions import ImproperlyConfigured
  10. try:
  11. import couchbase
  12. except ImportError:
  13. couchbase = None # noqa
  14. COUCHBASE_BUCKET = 'celery_bucket'
  15. @skip.unless_module('couchbase')
  16. class test_CouchbaseBackend:
  17. def setup(self):
  18. self.backend = CouchbaseBackend(app=self.app)
  19. def test_init_no_couchbase(self):
  20. prev, module.Couchbase = module.Couchbase, None
  21. try:
  22. with pytest.raises(ImproperlyConfigured):
  23. CouchbaseBackend(app=self.app)
  24. finally:
  25. module.Couchbase = prev
  26. def test_init_no_settings(self):
  27. self.app.conf.couchbase_backend_settings = []
  28. with pytest.raises(ImproperlyConfigured):
  29. CouchbaseBackend(app=self.app)
  30. def test_init_settings_is_None(self):
  31. self.app.conf.couchbase_backend_settings = None
  32. CouchbaseBackend(app=self.app)
  33. def test_get_connection_connection_exists(self):
  34. with patch('couchbase.connection.Connection') as mock_Connection:
  35. self.backend._connection = sentinel._connection
  36. connection = self.backend._get_connection()
  37. assert sentinel._connection == connection
  38. mock_Connection.assert_not_called()
  39. def test_get(self):
  40. self.app.conf.couchbase_backend_settings = {}
  41. x = CouchbaseBackend(app=self.app)
  42. x._connection = Mock()
  43. mocked_get = x._connection.get = Mock()
  44. mocked_get.return_value.value = sentinel.retval
  45. # should return None
  46. assert x.get('1f3fab') == sentinel.retval
  47. x._connection.get.assert_called_once_with('1f3fab')
  48. def test_set(self):
  49. self.app.conf.couchbase_backend_settings = None
  50. x = CouchbaseBackend(app=self.app)
  51. x._connection = MagicMock()
  52. x._connection.set = MagicMock()
  53. # should return None
  54. assert x.set(sentinel.key, sentinel.value) is None
  55. def test_delete(self):
  56. self.app.conf.couchbase_backend_settings = {}
  57. x = CouchbaseBackend(app=self.app)
  58. x._connection = Mock()
  59. mocked_delete = x._connection.delete = Mock()
  60. mocked_delete.return_value = None
  61. # should return None
  62. assert x.delete('1f3fab') is None
  63. x._connection.delete.assert_called_once_with('1f3fab')
  64. def test_config_params(self):
  65. self.app.conf.couchbase_backend_settings = {
  66. 'bucket': 'mycoolbucket',
  67. 'host': ['here.host.com', 'there.host.com'],
  68. 'username': 'johndoe',
  69. 'password': 'mysecret',
  70. 'port': '1234',
  71. }
  72. x = CouchbaseBackend(app=self.app)
  73. assert x.bucket == 'mycoolbucket'
  74. assert x.host == ['here.host.com', 'there.host.com']
  75. assert x.username == 'johndoe'
  76. assert x.password == 'mysecret'
  77. assert x.port == 1234
  78. def test_backend_by_url(self, url='couchbase://myhost/mycoolbucket'):
  79. from celery.backends.couchbase import CouchbaseBackend
  80. backend, url_ = backends.by_url(url, self.app.loader)
  81. assert backend is CouchbaseBackend
  82. assert url_ == url
  83. def test_backend_params_by_url(self):
  84. url = 'couchbase://johndoe:mysecret@myhost:123/mycoolbucket'
  85. with self.Celery(backend=url) as app:
  86. x = app.backend
  87. assert x.bucket == 'mycoolbucket'
  88. assert x.host == 'myhost'
  89. assert x.username == 'johndoe'
  90. assert x.password == 'mysecret'
  91. assert x.port == 123
  92. def test_correct_key_types(self):
  93. keys = [
  94. self.backend.get_key_for_task('task_id', bytes('key')),
  95. self.backend.get_key_for_chord('group_id', bytes('key')),
  96. self.backend.get_key_for_group('group_id', bytes('key')),
  97. self.backend.get_key_for_task('task_id', 'key'),
  98. self.backend.get_key_for_chord('group_id', 'key'),
  99. self.backend.get_key_for_group('group_id', 'key'),
  100. ]
  101. for key in keys:
  102. assert isinstance(key, str_t)