test_certificate.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from __future__ import absolute_import
  2. from celery.exceptions import SecurityError
  3. from celery.security.certificate import Certificate, CertStore, FSCertStore
  4. from mock import Mock, patch
  5. from . import CERT1, CERT2, KEY1
  6. from .case import SecurityCase
  7. from celery.tests.utils import mock_open
  8. class test_Certificate(SecurityCase):
  9. def test_valid_certificate(self):
  10. Certificate(CERT1)
  11. Certificate(CERT2)
  12. def test_invalid_certificate(self):
  13. self.assertRaises(TypeError, Certificate, None)
  14. self.assertRaises(SecurityError, Certificate, "")
  15. self.assertRaises(SecurityError, Certificate, "foo")
  16. self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
  17. self.assertRaises(SecurityError, Certificate, KEY1)
  18. def test_has_expired(self):
  19. self.assertFalse(Certificate(CERT1).has_expired())
  20. class test_CertStore(SecurityCase):
  21. def test_itercerts(self):
  22. cert1 = Certificate(CERT1)
  23. cert2 = Certificate(CERT2)
  24. certstore = CertStore()
  25. for c in certstore.itercerts():
  26. self.assertTrue(False)
  27. certstore.add_cert(cert1)
  28. certstore.add_cert(cert2)
  29. for c in certstore.itercerts():
  30. self.assertIn(c, (cert1, cert2))
  31. def test_duplicate(self):
  32. cert1 = Certificate(CERT1)
  33. certstore = CertStore()
  34. certstore.add_cert(cert1)
  35. self.assertRaises(SecurityError, certstore.add_cert, cert1)
  36. class test_FSCertStore(SecurityCase):
  37. @patch("os.path.isdir")
  38. @patch("glob.glob")
  39. @patch("celery.security.certificate.Certificate")
  40. def test_init(self, Certificate, glob, isdir):
  41. cert = Certificate.return_value = Mock()
  42. cert.has_expired.return_value = False
  43. isdir.return_value = True
  44. glob.return_value = ["foo.cert"]
  45. with mock_open():
  46. cert.get_id.return_value = 1
  47. x = FSCertStore("/var/certs")
  48. self.assertIn(1, x._certs)
  49. glob.assert_called_with("/var/certs/*")
  50. # they both end up with the same id
  51. glob.return_value = ["foo.cert", "bar.cert"]
  52. with self.assertRaises(SecurityError):
  53. x = FSCertStore("/var/certs")
  54. glob.return_value = ["foo.cert"]
  55. cert.has_expired.return_value = True
  56. with self.assertRaises(SecurityError):
  57. x = FSCertStore("/var/certs")
  58. isdir.return_value = False
  59. with self.assertRaises(SecurityError):
  60. x = FSCertStore("/var/certs")