test_certificate.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, mock, patch, skip
  4. from celery.exceptions import SecurityError
  5. from celery.security.certificate import Certificate, CertStore, FSCertStore
  6. from . import CERT1, CERT2, KEY1
  7. from .case import SecurityCase
  8. class test_Certificate(SecurityCase):
  9. def test_valid_certificate(self):
  10. Certificate(CERT1)
  11. Certificate(CERT2)
  12. def test_invalid_certificate(self):
  13. with pytest.raises((SecurityError, TypeError)):
  14. Certificate(None)
  15. with pytest.raises(SecurityError):
  16. Certificate('')
  17. with pytest.raises(SecurityError):
  18. Certificate('foo')
  19. with pytest.raises(SecurityError):
  20. Certificate(CERT1[:20] + CERT1[21:])
  21. with pytest.raises(SecurityError):
  22. Certificate(KEY1)
  23. @skip.todo(reason='cert expired')
  24. def test_has_expired(self):
  25. assert not Certificate(CERT1).has_expired()
  26. def test_has_expired_mock(self):
  27. x = Certificate(CERT1)
  28. x._cert = Mock(name='cert')
  29. assert x.has_expired() is x._cert.has_expired()
  30. class test_CertStore(SecurityCase):
  31. def test_itercerts(self):
  32. cert1 = Certificate(CERT1)
  33. cert2 = Certificate(CERT2)
  34. certstore = CertStore()
  35. for c in certstore.itercerts():
  36. assert False
  37. certstore.add_cert(cert1)
  38. certstore.add_cert(cert2)
  39. for c in certstore.itercerts():
  40. assert c in (cert1, cert2)
  41. def test_duplicate(self):
  42. cert1 = Certificate(CERT1)
  43. certstore = CertStore()
  44. certstore.add_cert(cert1)
  45. with pytest.raises(SecurityError):
  46. certstore.add_cert(cert1)
  47. class test_FSCertStore(SecurityCase):
  48. @patch('os.path.isdir')
  49. @patch('glob.glob')
  50. @patch('celery.security.certificate.Certificate')
  51. def test_init(self, Certificate, glob, isdir):
  52. cert = Certificate.return_value = Mock()
  53. cert.has_expired.return_value = False
  54. isdir.return_value = True
  55. glob.return_value = ['foo.cert']
  56. with mock.open():
  57. cert.get_id.return_value = 1
  58. x = FSCertStore('/var/certs')
  59. assert 1 in x._certs
  60. glob.assert_called_with('/var/certs/*')
  61. # they both end up with the same id
  62. glob.return_value = ['foo.cert', 'bar.cert']
  63. with pytest.raises(SecurityError):
  64. x = FSCertStore('/var/certs')
  65. glob.return_value = ['foo.cert']
  66. cert.has_expired.return_value = True
  67. with pytest.raises(SecurityError):
  68. x = FSCertStore('/var/certs')
  69. isdir.return_value = False
  70. with pytest.raises(SecurityError):
  71. x = FSCertStore('/var/certs')