Browse Source

- Added unit test case to better test signing and show the bug. Current test only ensures that you can sign a fixed string. Replaced with a test that generates lots of random strings. Old method reliably fails the test in under ~2000 passes.

- Reworked auth serialization code to use the sized of the signature for splitting up the string as well as seperator splitting. This should be backwards compatible, in fact the encoding method remains unchanged.
Matt Woodyard 12 years ago
parent
commit
52020ff708
2 changed files with 32 additions and 6 deletions
  1. 25 6
      celery/security/serialization.py
  2. 7 0
      celery/tests/security/test_serialization.py

+ 25 - 6
celery/security/serialization.py

@@ -73,16 +73,33 @@ class SecureSerializer(object):
         return b64encode(fields)
 
     def _unpack(self, payload, sep=str_to_bytes('\x00\x01')):
-        values = b64decode(ensure_bytes(payload)).split(sep)
+        raw_payload = b64decode(ensure_bytes(payload))
+        first_sep = raw_payload.find(sep)
+
+        signer = raw_payload[:first_sep]
+        signer_cert = self._cert_store[signer]
+
+        sig_len = signer_cert._cert.get_pubkey().bits() >> 3
+        signature = raw_payload[first_sep+len(sep):first_sep+len(sep)+sig_len]
+        end_of_sig = first_sep+len(sep)+sig_len+len(sep)
+        
+        v = raw_payload[end_of_sig:].split(sep)
+
+        values = [bytes_to_str(signer), bytes_to_str(signature),
+                  bytes_to_str(v[0]), bytes_to_str(v[1]), bytes_to_str(v[2])]
+                  
+
         return {
-            'signer': bytes_to_str(values[0]),
-            'signature': ensure_bytes(values[1]),
-            'content_type': bytes_to_str(values[2]),
-            'content_encoding': bytes_to_str(values[3]),
-            'body': ensure_bytes(values[4]),
+            'signer': values[0],
+            'signature': values[1],
+            'content_type': values[2],
+            'content_encoding': values[3],
+            'body': values[4],
         }
 
 
+
+
 def register_auth(key=None, cert=None, store=None, digest='sha1',
                   serializer='json'):
     """register security serializer"""
@@ -93,3 +110,5 @@ def register_auth(key=None, cert=None, store=None, digest='sha1',
     registry.register('auth', s.serialize, s.deserialize,
                       content_type='application/data',
                       content_encoding='utf-8')
+
+    registry._set_default_serializer('auth')

+ 7 - 0
celery/tests/security/test_serialization.py

@@ -10,6 +10,7 @@ from celery.security.key import PrivateKey
 from . import CERT1, CERT2, KEY1, KEY2
 from .case import SecurityCase
 
+import os,base64
 
 class test_SecureSerializer(SecurityCase):
 
@@ -53,3 +54,9 @@ class test_SecureSerializer(SecurityCase):
     def test_register_auth(self):
         register_auth(KEY1, CERT1, '')
         self.assertIn('application/data', registry._decoders)
+
+    def test_lots_of_sign(self):
+        for i in range(1000):
+            rdata = base64.urlsafe_b64encode(os.urandom(265))
+            s = self._get_s(KEY1, CERT1, [CERT1])
+            self.assertEqual(s.deserialize(s.serialize(rdata)), rdata)