test_certificate.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from __future__ import absolute_import
  2. from celery.exceptions import SecurityError
  3. from celery.security.certificate import Certificate, CertStore, FSCertStore
  4. from . import CERT1, CERT2, KEY1
  5. from .case import SecurityCase
  6. from celery.tests.case import Mock, mock_open, patch
  7. class test_Certificate(SecurityCase):
  8. def test_valid_certificate(self):
  9. Certificate(CERT1)
  10. Certificate(CERT2)
  11. def test_invalid_certificate(self):
  12. self.assertRaises((SecurityError, TypeError), Certificate, None)
  13. self.assertRaises(SecurityError, Certificate, '')
  14. self.assertRaises(SecurityError, Certificate, 'foo')
  15. self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
  16. self.assertRaises(SecurityError, Certificate, KEY1)
  17. def test_has_expired(self):
  18. self.assertFalse(Certificate(CERT1).has_expired())
  19. class test_CertStore(SecurityCase):
  20. def test_itercerts(self):
  21. cert1 = Certificate(CERT1)
  22. cert2 = Certificate(CERT2)
  23. certstore = CertStore()
  24. for c in certstore.itercerts():
  25. self.assertTrue(False)
  26. certstore.add_cert(cert1)
  27. certstore.add_cert(cert2)
  28. for c in certstore.itercerts():
  29. self.assertIn(c, (cert1, cert2))
  30. def test_duplicate(self):
  31. cert1 = Certificate(CERT1)
  32. certstore = CertStore()
  33. certstore.add_cert(cert1)
  34. self.assertRaises(SecurityError, certstore.add_cert, cert1)
  35. class test_FSCertStore(SecurityCase):
  36. @patch('os.path.isdir')
  37. @patch('glob.glob')
  38. @patch('celery.security.certificate.Certificate')
  39. def test_init(self, Certificate, glob, isdir):
  40. cert = Certificate.return_value = Mock()
  41. cert.has_expired.return_value = False
  42. isdir.return_value = True
  43. glob.return_value = ['foo.cert']
  44. with mock_open():
  45. cert.get_id.return_value = 1
  46. x = FSCertStore('/var/certs')
  47. self.assertIn(1, x._certs)
  48. glob.assert_called_with('/var/certs/*')
  49. # they both end up with the same id
  50. glob.return_value = ['foo.cert', 'bar.cert']
  51. with self.assertRaises(SecurityError):
  52. x = FSCertStore('/var/certs')
  53. glob.return_value = ['foo.cert']
  54. cert.has_expired.return_value = True
  55. with self.assertRaises(SecurityError):
  56. x = FSCertStore('/var/certs')
  57. isdir.return_value = False
  58. with self.assertRaises(SecurityError):
  59. x = FSCertStore('/var/certs')