test_couchdb.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from __future__ import absolute_import
  2. from celery.backends import couchdb as module
  3. from celery.backends.couchdb import CouchBackend
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery import backends
  6. from celery.tests.case import (
  7. AppCase, Mock, SkipTest, patch, sentinel,
  8. )
  9. try:
  10. import pycouchdb
  11. except ImportError:
  12. pycouchdb = None # noqa
  13. COUCHDB_CONTAINER = 'celery_container'
  14. class test_CouchBackend(AppCase):
  15. def setup(self):
  16. if pycouchdb is None:
  17. raise SkipTest('pycouchdb is not installed.')
  18. self.backend = CouchBackend(app=self.app)
  19. def test_init_no_pycouchdb(self):
  20. """test init no pycouchdb raises"""
  21. prev, module.pycouchdb = module.pycouchdb, None
  22. try:
  23. with self.assertRaises(ImproperlyConfigured):
  24. CouchBackend(app=self.app)
  25. finally:
  26. module.pycouchdb = prev
  27. def test_get_container_exists(self):
  28. with patch('pycouchdb.client.Database') as mock_Connection:
  29. self.backend._connection = sentinel._connection
  30. connection = self.backend._get_connection()
  31. self.assertEqual(sentinel._connection, connection)
  32. self.assertFalse(mock_Connection.called)
  33. def test_get(self):
  34. """test_get
  35. CouchBackend.get should return and take two params
  36. db conn to couchdb is mocked.
  37. TODO Should test on key not exists
  38. """
  39. x = CouchBackend(app=self.app)
  40. x._connection = Mock()
  41. mocked_get = x._connection.get = Mock()
  42. mocked_get.return_value = sentinel.retval
  43. # should return None
  44. self.assertEqual(x.get('1f3fab'), sentinel.retval)
  45. x._connection.get.assert_called_once_with('1f3fab')
  46. def test_delete(self):
  47. """test_delete
  48. CouchBackend.delete should return and take two params
  49. db conn to pycouchdb is mocked.
  50. TODO Should test on key not exists
  51. """
  52. x = CouchBackend(app=self.app)
  53. x._connection = Mock()
  54. mocked_delete = x._connection.delete = Mock()
  55. mocked_delete.return_value = None
  56. # should return None
  57. self.assertIsNone(x.delete('1f3fab'))
  58. x._connection.delete.assert_called_once_with('1f3fab')
  59. def test_backend_by_url(self, url='couchdb://myhost/mycoolcontainer'):
  60. from celery.backends.couchdb import CouchBackend
  61. backend, url_ = backends.get_backend_by_url(url, self.app.loader)
  62. self.assertIs(backend, CouchBackend)
  63. self.assertEqual(url_, url)
  64. def test_backend_params_by_url(self):
  65. url = 'couchdb://johndoe:mysecret@myhost:123/mycoolcontainer'
  66. with self.Celery(backend=url) as app:
  67. x = app.backend
  68. self.assertEqual(x.container, 'mycoolcontainer')
  69. self.assertEqual(x.host, 'myhost')
  70. self.assertEqual(x.username, 'johndoe')
  71. self.assertEqual(x.password, 'mysecret')
  72. self.assertEqual(x.port, 123)