serialization.py 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. """Secure serializer."""
  3. from __future__ import absolute_import, unicode_literals
  4. from kombu.serialization import registry, dumps, loads
  5. from kombu.utils.encoding import bytes_to_str, str_to_bytes, ensure_bytes
  6. from celery.five import bytes_if_py2
  7. from celery.utils.serialization import b64encode, b64decode
  8. from .certificate import Certificate, FSCertStore
  9. from .key import PrivateKey
  10. from .utils import reraise_errors
  11. __all__ = ('SecureSerializer', 'register_auth')
  12. class SecureSerializer(object):
  13. """Signed serializer."""
  14. def __init__(self, key=None, cert=None, cert_store=None,
  15. digest='sha1', serializer='json'):
  16. self._key = key
  17. self._cert = cert
  18. self._cert_store = cert_store
  19. self._digest = bytes_if_py2(digest)
  20. self._serializer = serializer
  21. def serialize(self, data):
  22. """Serialize data structure into string."""
  23. assert self._key is not None
  24. assert self._cert is not None
  25. with reraise_errors('Unable to serialize: {0!r}', (Exception,)):
  26. content_type, content_encoding, body = dumps(
  27. bytes_to_str(data), serializer=self._serializer)
  28. # What we sign is the serialized body, not the body itself.
  29. # this way the receiver doesn't have to decode the contents
  30. # to verify the signature (and thus avoiding potential flaws
  31. # in the decoding step).
  32. body = ensure_bytes(body)
  33. return self._pack(body, content_type, content_encoding,
  34. signature=self._key.sign(body, self._digest),
  35. signer=self._cert.get_id())
  36. def deserialize(self, data):
  37. """Deserialize data structure from string."""
  38. assert self._cert_store is not None
  39. with reraise_errors('Unable to deserialize: {0!r}', (Exception,)):
  40. payload = self._unpack(data)
  41. signature, signer, body = (payload['signature'],
  42. payload['signer'],
  43. payload['body'])
  44. self._cert_store[signer].verify(body, signature, self._digest)
  45. return loads(bytes_to_str(body), payload['content_type'],
  46. payload['content_encoding'], force=True)
  47. def _pack(self, body, content_type, content_encoding, signer, signature,
  48. sep=str_to_bytes('\x00\x01')):
  49. fields = sep.join(
  50. ensure_bytes(s) for s in [signer, signature, content_type,
  51. content_encoding, body]
  52. )
  53. return b64encode(fields)
  54. def _unpack(self, payload, sep=str_to_bytes('\x00\x01')):
  55. raw_payload = b64decode(ensure_bytes(payload))
  56. first_sep = raw_payload.find(sep)
  57. signer = raw_payload[:first_sep]
  58. signer_cert = self._cert_store[signer]
  59. sig_len = signer_cert._cert.get_pubkey().bits() >> 3
  60. signature = raw_payload[
  61. first_sep + len(sep):first_sep + len(sep) + sig_len
  62. ]
  63. end_of_sig = first_sep + len(sep) + sig_len + len(sep)
  64. v = raw_payload[end_of_sig:].split(sep)
  65. return {
  66. 'signer': signer,
  67. 'signature': signature,
  68. 'content_type': bytes_to_str(v[0]),
  69. 'content_encoding': bytes_to_str(v[1]),
  70. 'body': bytes_to_str(v[2]),
  71. }
  72. def register_auth(key=None, cert=None, store=None, digest='sha1',
  73. serializer='json'):
  74. """Register security serializer."""
  75. s = SecureSerializer(key and PrivateKey(key),
  76. cert and Certificate(cert),
  77. store and FSCertStore(store),
  78. digest=digest, serializer=serializer)
  79. registry.register('auth', s.serialize, s.deserialize,
  80. content_type='application/data',
  81. content_encoding='utf-8')