test_security.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. """Keys and certificates for tests (KEY1 is a private key of CERT1, etc.)
  2. Generated with:
  3. .. code-block:: console
  4. $ openssl genrsa -des3 -passout pass:test -out key1.key 1024
  5. $ openssl req -new -key key1.key -out key1.csr -passin pass:test
  6. $ cp key1.key key1.key.org
  7. $ openssl rsa -in key1.key.org -out key1.key -passin pass:test
  8. $ openssl x509 -req -days 365 -in cert1.csr \
  9. -signkey key1.key -out cert1.crt
  10. $ rm key1.key.org cert1.csr
  11. """
  12. from __future__ import absolute_import, unicode_literals
  13. import pytest
  14. from case import Mock, mock, patch
  15. from kombu.serialization import disable_insecure_serializers, registry
  16. from celery.exceptions import ImproperlyConfigured, SecurityError
  17. from celery.five import builtins
  18. from celery.security import disable_untrusted_serializers, setup_security
  19. from celery.security.utils import reraise_errors
  20. from .case import SecurityCase
  21. class test_security(SecurityCase):
  22. def teardown(self):
  23. registry._disabled_content_types.clear()
  24. def test_disable_insecure_serializers(self):
  25. try:
  26. disabled = registry._disabled_content_types
  27. assert disabled
  28. disable_insecure_serializers(
  29. ['application/json', 'application/x-python-serialize'],
  30. )
  31. assert 'application/x-yaml' in disabled
  32. assert 'application/json' not in disabled
  33. assert 'application/x-python-serialize' not in disabled
  34. disabled.clear()
  35. disable_insecure_serializers(allowed=None)
  36. assert 'application/x-yaml' in disabled
  37. assert 'application/json' in disabled
  38. assert 'application/x-python-serialize' in disabled
  39. finally:
  40. disable_insecure_serializers(allowed=['json'])
  41. @patch('celery.security._disable_insecure_serializers')
  42. def test_disable_untrusted_serializers(self, disable):
  43. disable_untrusted_serializers(['foo'])
  44. disable.assert_called_with(allowed=['foo'])
  45. def test_setup_security(self):
  46. disabled = registry._disabled_content_types
  47. assert len(disabled) == 0
  48. self.app.conf.task_serializer = 'json'
  49. self.app.setup_security()
  50. assert 'application/x-python-serialize' in disabled
  51. disabled.clear()
  52. @patch('celery.current_app')
  53. def test_setup_security__default_app(self, current_app):
  54. setup_security()
  55. @patch('celery.security.register_auth')
  56. @patch('celery.security._disable_insecure_serializers')
  57. def test_setup_registry_complete(self, dis, reg, key='KEY', cert='CERT'):
  58. calls = [0]
  59. def effect(*args):
  60. try:
  61. m = Mock()
  62. m.read.return_value = 'B' if calls[0] else 'A'
  63. return m
  64. finally:
  65. calls[0] += 1
  66. self.app.conf.task_serializer = 'auth'
  67. with mock.open(side_effect=effect):
  68. with patch('celery.security.registry') as registry:
  69. store = Mock()
  70. self.app.setup_security(['json'], key, cert, store)
  71. dis.assert_called_with(['json'])
  72. reg.assert_called_with('A', 'B', store, 'sha1', 'json')
  73. registry._set_default_serializer.assert_called_with('auth')
  74. def test_security_conf(self):
  75. self.app.conf.task_serializer = 'auth'
  76. with pytest.raises(ImproperlyConfigured):
  77. self.app.setup_security()
  78. _import = builtins.__import__
  79. def import_hook(name, *args, **kwargs):
  80. if name == 'OpenSSL':
  81. raise ImportError
  82. return _import(name, *args, **kwargs)
  83. builtins.__import__ = import_hook
  84. with pytest.raises(ImproperlyConfigured):
  85. self.app.setup_security()
  86. builtins.__import__ = _import
  87. def test_reraise_errors(self):
  88. with pytest.raises(SecurityError):
  89. with reraise_errors(errors=(KeyError,)):
  90. raise KeyError('foo')
  91. with pytest.raises(KeyError):
  92. with reraise_errors(errors=(ValueError,)):
  93. raise KeyError('bar')