123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- # -*- coding: utf-8 -*-
- """
- celery.security.serialization
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Secure serializer.
- """
- from __future__ import absolute_import
- import base64
- from kombu.serialization import registry, dumps, loads
- from kombu.utils.encoding import bytes_to_str, str_to_bytes, ensure_bytes
- from .certificate import Certificate, FSCertStore
- from .key import PrivateKey
- from .utils import reraise_errors
- __all__ = ['SecureSerializer', 'register_auth']
- def b64encode(s):
- return bytes_to_str(base64.b64encode(str_to_bytes(s)))
- def b64decode(s):
- return base64.b64decode(str_to_bytes(s))
- class SecureSerializer(object):
- def __init__(self, key=None, cert=None, cert_store=None,
- digest='sha1', serializer='json'):
- self._key = key
- self._cert = cert
- self._cert_store = cert_store
- self._digest = digest
- self._serializer = serializer
- def serialize(self, data):
- """serialize data structure into string"""
- assert self._key is not None
- assert self._cert is not None
- with reraise_errors('Unable to serialize: {0!r}', (Exception, )):
- content_type, content_encoding, body = dumps(
- data, serializer=self._serializer)
- # What we sign is the serialized body, not the body itself.
- # this way the receiver doesn't have to decode the contents
- # to verify the signature (and thus avoiding potential flaws
- # in the decoding step).
- body = ensure_bytes(body)
- return self._pack(body, content_type, content_encoding,
- signature=self._key.sign(body, self._digest),
- signer=self._cert.get_id())
- def deserialize(self, data):
- """deserialize data structure from string"""
- assert self._cert_store is not None
- with reraise_errors('Unable to deserialize: {0!r}', (Exception, )):
- payload = self._unpack(data)
- signature, signer, body = (payload['signature'],
- payload['signer'],
- payload['body'])
- self._cert_store[signer].verify(body, signature, self._digest)
- return loads(bytes_to_str(body), payload['content_type'],
- payload['content_encoding'], force=True)
- def _pack(self, body, content_type, content_encoding, signer, signature,
- sep=str_to_bytes('\x00\x01')):
- fields = sep.join(
- ensure_bytes(s) for s in [signer, signature, content_type,
- content_encoding, body]
- )
- return b64encode(fields)
- def _unpack(self, payload, sep=str_to_bytes('\x00\x01')):
- raw_payload = b64decode(ensure_bytes(payload))
- first_sep = raw_payload.find(sep)
- signer = raw_payload[:first_sep]
- signer_cert = self._cert_store[signer]
- sig_len = signer_cert._cert.get_pubkey().bits() >> 3
- signature = raw_payload[
- first_sep + len(sep):first_sep + len(sep) + sig_len
- ]
- end_of_sig = first_sep + len(sep) + sig_len+len(sep)
- v = raw_payload[end_of_sig:].split(sep)
- values = [bytes_to_str(signer), bytes_to_str(signature),
- bytes_to_str(v[0]), bytes_to_str(v[1]), bytes_to_str(v[2])]
- return {
- 'signer': values[0],
- 'signature': values[1],
- 'content_type': values[2],
- 'content_encoding': values[3],
- 'body': values[4],
- }
- def register_auth(key=None, cert=None, store=None, digest='sha1',
- serializer='json'):
- """register security serializer"""
- s = SecureSerializer(key and PrivateKey(key),
- cert and Certificate(cert),
- store and FSCertStore(store),
- digest=digest, serializer=serializer)
- registry.register('auth', s.serialize, s.deserialize,
- content_type='application/data',
- content_encoding='utf-8')
|