test_security.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """
  2. Keys and certificates for tests (KEY1 is a private key of CERT1, etc.)
  3. Generated with::
  4. $ openssl genrsa -des3 -passout pass:test -out key1.key 1024
  5. $ openssl req -new -key key1.key -out key1.csr -passin pass:test
  6. $ cp key1.key key1.key.org
  7. $ openssl rsa -in key1.key.org -out key1.key -passin pass:test
  8. $ openssl x509 -req -days 365 -in cert1.csr \
  9. -signkey key1.key -out cert1.crt
  10. $ rm key1.key.org cert1.csr
  11. """
  12. from __future__ import absolute_import
  13. from __future__ import with_statement
  14. import __builtin__
  15. from mock import Mock, patch
  16. from celery import current_app
  17. from celery.exceptions import ImproperlyConfigured
  18. from celery.security import setup_security, disable_untrusted_serializers
  19. from kombu.serialization import registry
  20. from .case import SecurityCase
  21. from celery.tests.utils import mock_open
  22. class test_security(SecurityCase):
  23. def tearDown(self):
  24. registry._disabled_content_types.clear()
  25. def test_disable_untrusted_serializers(self):
  26. disabled = registry._disabled_content_types
  27. self.assertEqual(0, len(disabled))
  28. disable_untrusted_serializers(
  29. ['application/json', 'application/x-python-serialize'])
  30. self.assertIn('application/x-yaml', disabled)
  31. self.assertNotIn('application/json', disabled)
  32. self.assertNotIn('application/x-python-serialize', disabled)
  33. disabled.clear()
  34. disable_untrusted_serializers()
  35. self.assertIn('application/x-yaml', disabled)
  36. self.assertIn('application/json', disabled)
  37. self.assertIn('application/x-python-serialize', disabled)
  38. def test_setup_security(self):
  39. disabled = registry._disabled_content_types
  40. self.assertEqual(0, len(disabled))
  41. current_app.conf.CELERY_TASK_SERIALIZER = 'json'
  42. setup_security()
  43. self.assertIn('application/x-python-serialize', disabled)
  44. disabled.clear()
  45. @patch("celery.security.register_auth")
  46. @patch("celery.security.disable_untrusted_serializers")
  47. def test_setup_registry_complete(self, dis, reg, key="KEY", cert="CERT"):
  48. calls = [0]
  49. def effect(*args):
  50. try:
  51. m = Mock()
  52. m.read.return_value = "B" if calls[0] else "A"
  53. return m
  54. finally:
  55. calls[0] += 1
  56. with mock_open(side_effect=effect):
  57. store = Mock()
  58. setup_security(["json"], key, cert, store)
  59. dis.assert_called_with(["json"])
  60. reg.assert_called_with("A", "B", store)
  61. def test_security_conf(self):
  62. current_app.conf.CELERY_TASK_SERIALIZER = 'auth'
  63. self.assertRaises(ImproperlyConfigured, setup_security)
  64. _import = __builtin__.__import__
  65. def import_hook(name, *args, **kwargs):
  66. if name == 'OpenSSL':
  67. raise ImportError
  68. return _import(name, *args, **kwargs)
  69. __builtin__.__import__ = import_hook
  70. self.assertRaises(ImproperlyConfigured, setup_security)
  71. __builtin__.__import__ = _import