12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- from __future__ import absolute_import
- from celery.exceptions import SecurityError
- from celery.security.certificate import Certificate, CertStore, FSCertStore
- from mock import Mock, patch
- from . import CERT1, CERT2, KEY1
- from .case import SecurityCase
- from celery.tests.utils import mock_open
- class test_Certificate(SecurityCase):
- def test_valid_certificate(self):
- Certificate(CERT1)
- Certificate(CERT2)
- def test_invalid_certificate(self):
- self.assertRaises(TypeError, Certificate, None)
- self.assertRaises(SecurityError, Certificate, "")
- self.assertRaises(SecurityError, Certificate, "foo")
- self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
- self.assertRaises(SecurityError, Certificate, KEY1)
- def test_has_expired(self):
- self.assertFalse(Certificate(CERT1).has_expired())
- class test_CertStore(SecurityCase):
- def test_itercerts(self):
- cert1 = Certificate(CERT1)
- cert2 = Certificate(CERT2)
- certstore = CertStore()
- for c in certstore.itercerts():
- self.assertTrue(False)
- certstore.add_cert(cert1)
- certstore.add_cert(cert2)
- for c in certstore.itercerts():
- self.assertIn(c, (cert1, cert2))
- def test_duplicate(self):
- cert1 = Certificate(CERT1)
- certstore = CertStore()
- certstore.add_cert(cert1)
- self.assertRaises(SecurityError, certstore.add_cert, cert1)
- class test_FSCertStore(SecurityCase):
- @patch("os.path.isdir")
- @patch("glob.glob")
- @patch("celery.security.certificate.Certificate")
- def test_init(self, Certificate, glob, isdir):
- cert = Certificate.return_value = Mock()
- cert.has_expired.return_value = False
- isdir.return_value = True
- glob.return_value = ["foo.cert"]
- with mock_open():
- cert.get_id.return_value = 1
- x = FSCertStore("/var/certs")
- self.assertIn(1, x._certs)
- glob.assert_called_with("/var/certs/*")
- # they both end up with the same id
- glob.return_value = ["foo.cert", "bar.cert"]
- with self.assertRaises(SecurityError):
- x = FSCertStore("/var/certs")
- glob.return_value = ["foo.cert"]
- cert.has_expired.return_value = True
- with self.assertRaises(SecurityError):
- x = FSCertStore("/var/certs")
- isdir.return_value = False
- with self.assertRaises(SecurityError):
- x = FSCertStore("/var/certs")
|