test_certificate.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import pytest
  2. from case import Mock, mock, patch, skip
  3. from celery.exceptions import SecurityError
  4. from celery.security.certificate import Certificate, CertStore, FSCertStore
  5. from . import CERT1, CERT2, KEY1
  6. from .case import SecurityCase
  7. class test_Certificate(SecurityCase):
  8. def test_valid_certificate(self):
  9. Certificate(CERT1)
  10. Certificate(CERT2)
  11. def test_invalid_certificate(self):
  12. with pytest.raises((SecurityError, TypeError)):
  13. Certificate(None)
  14. with pytest.raises(SecurityError):
  15. Certificate('')
  16. with pytest.raises(SecurityError):
  17. Certificate('foo')
  18. with pytest.raises(SecurityError):
  19. Certificate(CERT1[:20] + CERT1[21:])
  20. with pytest.raises(SecurityError):
  21. Certificate(KEY1)
  22. @skip.todo(reason='cert expired')
  23. def test_has_expired(self):
  24. assert not Certificate(CERT1).has_expired()
  25. def test_has_expired_mock(self):
  26. x = Certificate(CERT1)
  27. x._cert = Mock(name='cert')
  28. assert x.has_expired() is x._cert.has_expired()
  29. class test_CertStore(SecurityCase):
  30. def test_itercerts(self):
  31. cert1 = Certificate(CERT1)
  32. cert2 = Certificate(CERT2)
  33. certstore = CertStore()
  34. for c in certstore.itercerts():
  35. assert False
  36. certstore.add_cert(cert1)
  37. certstore.add_cert(cert2)
  38. for c in certstore.itercerts():
  39. assert c in (cert1, cert2)
  40. def test_duplicate(self):
  41. cert1 = Certificate(CERT1)
  42. certstore = CertStore()
  43. certstore.add_cert(cert1)
  44. with pytest.raises(SecurityError):
  45. certstore.add_cert(cert1)
  46. class test_FSCertStore(SecurityCase):
  47. @patch('os.path.isdir')
  48. @patch('glob.glob')
  49. @patch('celery.security.certificate.Certificate')
  50. def test_init(self, Certificate, glob, isdir):
  51. cert = Certificate.return_value = Mock()
  52. cert.has_expired.return_value = False
  53. isdir.return_value = True
  54. glob.return_value = ['foo.cert']
  55. with mock.open():
  56. cert.get_id.return_value = 1
  57. x = FSCertStore('/var/certs')
  58. assert 1 in x._certs
  59. glob.assert_called_with('/var/certs/*')
  60. # they both end up with the same id
  61. glob.return_value = ['foo.cert', 'bar.cert']
  62. with pytest.raises(SecurityError):
  63. x = FSCertStore('/var/certs')
  64. glob.return_value = ['foo.cert']
  65. cert.has_expired.return_value = True
  66. with pytest.raises(SecurityError):
  67. x = FSCertStore('/var/certs')
  68. isdir.return_value = False
  69. with pytest.raises(SecurityError):
  70. x = FSCertStore('/var/certs')