test_certificate.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from celery.exceptions import SecurityError
  4. from celery.security.certificate import Certificate, CertStore, FSCertStore
  5. from mock import Mock, patch
  6. from . import CERT1, CERT2, KEY1
  7. from .case import SecurityCase
  8. from celery.tests.utils import mock_open
  9. class test_Certificate(SecurityCase):
  10. def test_valid_certificate(self):
  11. Certificate(CERT1)
  12. Certificate(CERT2)
  13. def test_invalid_certificate(self):
  14. self.assertRaises(TypeError, Certificate, None)
  15. self.assertRaises(SecurityError, Certificate, '')
  16. self.assertRaises(SecurityError, Certificate, 'foo')
  17. self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
  18. self.assertRaises(SecurityError, Certificate, KEY1)
  19. def test_has_expired(self):
  20. self.assertTrue(Certificate(CERT1).has_expired())
  21. class test_CertStore(SecurityCase):
  22. def test_itercerts(self):
  23. cert1 = Certificate(CERT1)
  24. cert2 = Certificate(CERT2)
  25. certstore = CertStore()
  26. for c in certstore.itercerts():
  27. self.assertTrue(False)
  28. certstore.add_cert(cert1)
  29. certstore.add_cert(cert2)
  30. for c in certstore.itercerts():
  31. self.assertIn(c, (cert1, cert2))
  32. def test_duplicate(self):
  33. cert1 = Certificate(CERT1)
  34. certstore = CertStore()
  35. certstore.add_cert(cert1)
  36. self.assertRaises(SecurityError, certstore.add_cert, cert1)
  37. class test_FSCertStore(SecurityCase):
  38. @patch('os.path.isdir')
  39. @patch('glob.glob')
  40. @patch('celery.security.certificate.Certificate')
  41. def test_init(self, Certificate, glob, isdir):
  42. cert = Certificate.return_value = Mock()
  43. cert.has_expired.return_value = False
  44. isdir.return_value = True
  45. glob.return_value = ['foo.cert']
  46. with mock_open():
  47. cert.get_id.return_value = 1
  48. x = FSCertStore('/var/certs')
  49. self.assertIn(1, x._certs)
  50. glob.assert_called_with('/var/certs/*')
  51. # they both end up with the same id
  52. glob.return_value = ['foo.cert', 'bar.cert']
  53. with self.assertRaises(SecurityError):
  54. x = FSCertStore('/var/certs')
  55. glob.return_value = ['foo.cert']
  56. cert.has_expired.return_value = True
  57. with self.assertRaises(SecurityError):
  58. x = FSCertStore('/var/certs')
  59. isdir.return_value = False
  60. with self.assertRaises(SecurityError):
  61. x = FSCertStore('/var/certs')