certificate.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # -*- coding: utf-8 -*-
  2. """X.509 certificates."""
  3. from __future__ import absolute_import, unicode_literals
  4. import glob
  5. import os
  6. from kombu.utils.encoding import bytes_to_str
  7. from celery.exceptions import SecurityError
  8. from celery.five import values
  9. from .utils import crypto, reraise_errors
  10. __all__ = ('Certificate', 'CertStore', 'FSCertStore')
  11. class Certificate(object):
  12. """X.509 certificate."""
  13. def __init__(self, cert):
  14. assert crypto is not None
  15. with reraise_errors('Invalid certificate: {0!r}'):
  16. self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
  17. def has_expired(self):
  18. """Check if the certificate has expired."""
  19. return self._cert.has_expired()
  20. def get_serial_number(self):
  21. """Return the serial number in the certificate."""
  22. return bytes_to_str(self._cert.get_serial_number())
  23. def get_issuer(self):
  24. """Return issuer (CA) as a string."""
  25. return ' '.join(bytes_to_str(x[1]) for x in
  26. self._cert.get_issuer().get_components())
  27. def get_id(self):
  28. """Serial number/issuer pair uniquely identifies a certificate."""
  29. return '{0} {1}'.format(self.get_issuer(), self.get_serial_number())
  30. def verify(self, data, signature, digest):
  31. """Verify signature for string containing data."""
  32. with reraise_errors('Bad signature: {0!r}'):
  33. crypto.verify(self._cert, signature, data, digest)
  34. class CertStore(object):
  35. """Base class for certificate stores."""
  36. def __init__(self):
  37. self._certs = {}
  38. def itercerts(self):
  39. """Return certificate iterator."""
  40. for c in values(self._certs):
  41. yield c
  42. def __getitem__(self, id):
  43. """Get certificate by id."""
  44. try:
  45. return self._certs[bytes_to_str(id)]
  46. except KeyError:
  47. raise SecurityError('Unknown certificate: {0!r}'.format(id))
  48. def add_cert(self, cert):
  49. cert_id = bytes_to_str(cert.get_id())
  50. if cert_id in self._certs:
  51. raise SecurityError('Duplicate certificate: {0!r}'.format(id))
  52. self._certs[cert_id] = cert
  53. class FSCertStore(CertStore):
  54. """File system certificate store."""
  55. def __init__(self, path):
  56. CertStore.__init__(self)
  57. if os.path.isdir(path):
  58. path = os.path.join(path, '*')
  59. for p in glob.glob(path):
  60. with open(p) as f:
  61. cert = Certificate(f.read())
  62. if cert.has_expired():
  63. raise SecurityError(
  64. 'Expired certificate: {0!r}'.format(cert.get_id()))
  65. self.add_cert(cert)