test_certificate.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from __future__ import absolute_import
  2. from celery.exceptions import SecurityError
  3. from celery.security.certificate import Certificate, CertStore, FSCertStore
  4. from . import CERT1, CERT2, KEY1
  5. from .case import SecurityCase
  6. from celery.tests.case import Mock, SkipTest, mock_open, patch
  7. class test_Certificate(SecurityCase):
  8. def test_valid_certificate(self):
  9. Certificate(CERT1)
  10. Certificate(CERT2)
  11. def test_invalid_certificate(self):
  12. self.assertRaises((SecurityError, TypeError), Certificate, None)
  13. self.assertRaises(SecurityError, Certificate, '')
  14. self.assertRaises(SecurityError, Certificate, 'foo')
  15. self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
  16. self.assertRaises(SecurityError, Certificate, KEY1)
  17. def test_has_expired(self):
  18. raise SkipTest('cert expired')
  19. self.assertFalse(Certificate(CERT1).has_expired())
  20. class test_CertStore(SecurityCase):
  21. def test_itercerts(self):
  22. cert1 = Certificate(CERT1)
  23. cert2 = Certificate(CERT2)
  24. certstore = CertStore()
  25. for c in certstore.itercerts():
  26. self.assertTrue(False)
  27. certstore.add_cert(cert1)
  28. certstore.add_cert(cert2)
  29. for c in certstore.itercerts():
  30. self.assertIn(c, (cert1, cert2))
  31. def test_duplicate(self):
  32. cert1 = Certificate(CERT1)
  33. certstore = CertStore()
  34. certstore.add_cert(cert1)
  35. self.assertRaises(SecurityError, certstore.add_cert, cert1)
  36. class test_FSCertStore(SecurityCase):
  37. @patch('os.path.isdir')
  38. @patch('glob.glob')
  39. @patch('celery.security.certificate.Certificate')
  40. def test_init(self, Certificate, glob, isdir):
  41. cert = Certificate.return_value = Mock()
  42. cert.has_expired.return_value = False
  43. isdir.return_value = True
  44. glob.return_value = ['foo.cert']
  45. with mock_open():
  46. cert.get_id.return_value = 1
  47. x = FSCertStore('/var/certs')
  48. self.assertIn(1, x._certs)
  49. glob.assert_called_with('/var/certs/*')
  50. # they both end up with the same id
  51. glob.return_value = ['foo.cert', 'bar.cert']
  52. with self.assertRaises(SecurityError):
  53. x = FSCertStore('/var/certs')
  54. glob.return_value = ['foo.cert']
  55. cert.has_expired.return_value = True
  56. with self.assertRaises(SecurityError):
  57. x = FSCertStore('/var/certs')
  58. isdir.return_value = False
  59. with self.assertRaises(SecurityError):
  60. x = FSCertStore('/var/certs')