12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- from __future__ import absolute_import
- import base64
- import sys
- from kombu.serialization import registry, encode, decode
- from ..exceptions import SecurityError
- from ..utils.encoding import bytes_to_str, str_to_bytes
- from .certificate import Certificate, FSCertStore
- from .key import PrivateKey
- def b64encode(s):
- return bytes_to_str(base64.b64encode(str_to_bytes(s)))
- def b64decode(s):
- return base64.b64decode(str_to_bytes(s))
- class SecureSerializer(object):
- def __init__(self, key=None, cert=None, cert_store=None,
- digest="sha1", serializer="json"):
- self._key = key
- self._cert = cert
- self._cert_store = cert_store
- self._digest = digest
- self._serializer = serializer
- def serialize(self, data):
- """serialize data structure into string"""
- assert self._key is not None
- assert self._cert is not None
- try:
- content_type, content_encoding, body = encode(
- data, serializer=self._serializer)
- # What we sign is the serialized body, not the body itself.
- # this way the receiver doesn't have to decode the contents
- # to verify the signature (and thus avoiding potential flaws
- # in the decoding step).
- return self._pack(body, content_type, content_encoding,
- signature=self._key.sign(body, self._digest),
- signer=self._cert.get_id())
- except Exception, exc:
- raise SecurityError, SecurityError(
- "Unable to serialize: %r" % (exc, )), sys.exc_info()[2]
- def deserialize(self, data):
- """deserialize data structure from string"""
- assert self._cert_store is not None
- try:
- payload = self._unpack(data)
- signature, signer, body = (payload["signature"],
- payload["signer"],
- payload["body"])
- self._cert_store[signer].verify(body,
- signature, self._digest)
- except Exception, exc:
- raise SecurityError, SecurityError(
- "Unable to deserialize: %r" % (exc, )), sys.exc_info()[2]
- return decode(body, payload["content_type"],
- payload["content_encoding"], force=True)
- def _pack(self, body, content_type, content_encoding, signer, signature,
- sep='\x00\x01'):
- return b64encode(sep.join([signer, signature,
- content_type, content_encoding, body]))
- def _unpack(self, payload, sep='\x00\x01',
- fields=("signer", "signature", "content_type",
- "content_encoding", "body")):
- return dict(zip(fields, b64decode(payload).split(sep)))
- def register_auth(key=None, cert=None, store=None, digest="sha1",
- serializer="json"):
- """register security serializer"""
- s = SecureSerializer(key and PrivateKey(key),
- cert and Certificate(cert),
- store and FSCertStore(store),
- digest=digest, serializer=serializer)
- registry.register("auth", s.serialize, s.deserialize,
- content_type="application/data",
- content_encoding="utf-8")
|