certificate.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.security.certificate
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. X.509 certificates.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import glob
  10. import os
  11. from celery.exceptions import SecurityError
  12. from .utils import crypto, reraise_errors
  13. class Certificate(object):
  14. """X.509 certificate."""
  15. def __init__(self, cert):
  16. assert crypto is not None
  17. with reraise_errors('Invalid certificate: %r'):
  18. self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
  19. def has_expired(self):
  20. """Check if the certificate has expired."""
  21. return self._cert.has_expired()
  22. def get_serial_number(self):
  23. """Returns the certificates serial number."""
  24. return self._cert.get_serial_number()
  25. def get_issuer(self):
  26. """Returns issuer (CA) as a string"""
  27. return ' '.join(x[1] for x in
  28. self._cert.get_issuer().get_components())
  29. def get_id(self):
  30. """Serial number/issuer pair uniquely identifies a certificate"""
  31. return '%s %s' % (self.get_issuer(), self.get_serial_number())
  32. def verify(self, data, signature, digest):
  33. """Verifies the signature for string containing data."""
  34. with reraise_errors('Bad signature: %r'):
  35. crypto.verify(self._cert, signature, data, digest)
  36. class CertStore(object):
  37. """Base class for certificate stores"""
  38. def __init__(self):
  39. self._certs = {}
  40. def itercerts(self):
  41. """an iterator over the certificates"""
  42. for c in self._certs.itervalues():
  43. yield c
  44. def __getitem__(self, id):
  45. """get certificate by id"""
  46. try:
  47. return self._certs[id]
  48. except KeyError:
  49. raise SecurityError('Unknown certificate: %r' % (id, ))
  50. def add_cert(self, cert):
  51. if cert.get_id() in self._certs:
  52. raise SecurityError('Duplicate certificate: %r' % (id, ))
  53. self._certs[cert.get_id()] = cert
  54. class FSCertStore(CertStore):
  55. """File system certificate store"""
  56. def __init__(self, path):
  57. CertStore.__init__(self)
  58. if os.path.isdir(path):
  59. path = os.path.join(path, '*')
  60. for p in glob.glob(path):
  61. with open(p) as f:
  62. cert = Certificate(f.read())
  63. if cert.has_expired():
  64. raise SecurityError(
  65. 'Expired certificate: %r' % (cert.get_id(), ))
  66. self.add_cert(cert)