Browse Source

security: Use kombu disable_insecure_serializers

Fixes security tests
Ask Solem 11 years ago
parent
commit
b51d3ed219

+ 9 - 8
celery/security/__init__.py

@@ -8,7 +8,9 @@
 """
 from __future__ import absolute_import
 
-from kombu.serialization import registry
+from kombu.serialization import (
+    registry, disable_insecure_serializers as _disable_insecure_serializers,
+)
 
 from celery.exceptions import ImproperlyConfigured
 
@@ -31,12 +33,7 @@ configuration settings to use the auth serializer.
 Please see the configuration reference for more information.
 """
 
-
-def disable_untrusted_serializers(whitelist=None):
-    for name in set(registry._decoders) - set(whitelist or []):
-        registry.disable(name)
-    for name in whitelist or []:
-        registry.enable(name)
+__all__ = ['setup_security']
 
 
 def setup_security(allowed_serializers=None, key=None, cert=None, store=None,
@@ -46,7 +43,7 @@ def setup_security(allowed_serializers=None, key=None, cert=None, store=None,
         from celery import current_app
         app = current_app._get_current_object()
 
-    disable_untrusted_serializers(allowed_serializers)
+    _disable_insecure_serializers(allowed_serializers)
 
     conf = app.conf
     if conf.CELERY_TASK_SERIALIZER != 'auth':
@@ -68,3 +65,7 @@ def setup_security(allowed_serializers=None, key=None, cert=None, store=None,
         with open(cert) as cf:
             register_auth(kf.read(), cf.read(), store, digest, serializer)
     registry._set_default_serializer('auth')
+
+
+def disable_untrusted_serializers(whitelist=None):
+    _disable_insecure_serializers(allowed=whitelist)

+ 2 - 0
celery/security/certificate.py

@@ -18,6 +18,8 @@ from celery.five import values
 
 from .utils import crypto, reraise_errors
 
+__all__ = ['Certificate', 'CertStore', 'FSCertStore']
+
 
 class Certificate(object):
     """X.509 certificate."""

+ 2 - 0
celery/security/key.py

@@ -12,6 +12,8 @@ from kombu.utils.encoding import ensure_bytes
 
 from .utils import crypto, reraise_errors
 
+__all__ = ['PrivateKey']
+
 
 class PrivateKey(object):
 

+ 2 - 0
celery/security/serialization.py

@@ -17,6 +17,8 @@ from .certificate import Certificate, FSCertStore
 from .key import PrivateKey
 from .utils import reraise_errors
 
+__all__ = ['SecureSerializer', 'register_auth']
+
 
 def b64encode(s):
     return bytes_to_str(base64.b64encode(str_to_bytes(s)))

+ 2 - 0
celery/security/utils.py

@@ -20,6 +20,8 @@ try:
 except ImportError:  # pragma: no cover
     crypto = None    # noqa
 
+__all__ = ['reraise_errors']
+
 
 @contextmanager
 def reraise_errors(msg='{0!r}', errors=None):

+ 21 - 16
celery/tests/security/test_security.py

@@ -18,9 +18,10 @@ from __future__ import absolute_import
 
 from mock import Mock, patch
 
+from kombu.serialization import disable_insecure_serializers
+
 from celery.exceptions import ImproperlyConfigured, SecurityError
 from celery.five import builtins
-from celery.security import disable_untrusted_serializers
 from celery.security.utils import reraise_errors
 from kombu.serialization import registry
 
@@ -34,21 +35,25 @@ class test_security(SecurityCase):
     def tearDown(self):
         registry._disabled_content_types.clear()
 
-    def test_disable_untrusted_serializers(self):
-        disabled = registry._disabled_content_types
-        self.assertTrue(disabled)
-
-        disable_untrusted_serializers(
-            ['application/json', 'application/x-python-serialize'])
-        self.assertIn('application/x-yaml', disabled)
-        self.assertNotIn('application/json', disabled)
-        self.assertNotIn('application/x-python-serialize', disabled)
-        disabled.clear()
+    def test_disable_insecure_serializers(self):
+        try:
+            disabled = registry._disabled_content_types
+            self.assertTrue(disabled)
+
+            disable_insecure_serializers(
+                ['application/json', 'application/x-python-serialize'],
+            )
+            self.assertIn('application/x-yaml', disabled)
+            self.assertNotIn('application/json', disabled)
+            self.assertNotIn('application/x-python-serialize', disabled)
+            disabled.clear()
 
-        disable_untrusted_serializers()
-        self.assertIn('application/x-yaml', disabled)
-        self.assertIn('application/json', disabled)
-        self.assertIn('application/x-python-serialize', disabled)
+            disable_insecure_serializers(allowed=None)
+            self.assertIn('application/x-yaml', disabled)
+            self.assertIn('application/json', disabled)
+            self.assertIn('application/x-python-serialize', disabled)
+        finally:
+            disable_insecure_serializers(allowed=['json'])
 
     def test_setup_security(self):
         disabled = registry._disabled_content_types
@@ -64,7 +69,7 @@ class test_security(SecurityCase):
             self.app.conf.CELERY_TASK_SERIALIZER = prev
 
     @patch('celery.security.register_auth')
-    @patch('celery.security.disable_untrusted_serializers')
+    @patch('celery.security._disable_insecure_serializers')
     def test_setup_registry_complete(self, dis, reg, key='KEY', cert='CERT'):
         calls = [0]