| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 | # -*- coding: utf-8 -*-from __future__ import absolute_import, unicode_literalsfrom decimal import Decimalimport pytestfrom case import MagicMock, Mock, patch, sentinel, skipfrom celery.backends import dynamodb as modulefrom celery.backends.dynamodb import DynamoDBBackendfrom celery.exceptions import ImproperlyConfiguredfrom celery.five import string@skip.unless_module('boto3')class test_DynamoDBBackend:    def setup(self):        self._static_timestamp = Decimal(1483425566.52)  # noqa        self.app.conf.result_backend = 'dynamodb://'    @property    def backend(self):        """:rtype: DynamoDBBackend"""        return self.app.backend    def test_init_no_boto3(self):        prev, module.boto3 = module.boto3, None        try:            with pytest.raises(ImproperlyConfigured):                DynamoDBBackend(app=self.app)        finally:            module.boto3 = prev    def test_init_aws_credentials(self):        with pytest.raises(ImproperlyConfigured):            DynamoDBBackend(                app=self.app,                url='dynamodb://a:@'            )    def test_get_client_local(self):        table_creation_path = \            'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'        with patch('boto3.client') as mock_boto_client, \                patch(table_creation_path):                backend = DynamoDBBackend(                    app=self.app,                    url='dynamodb://@localhost:8000'                )                client = backend._get_client()                assert backend.client is client                mock_boto_client.assert_called_once_with(                    'dynamodb',                    endpoint_url='http://localhost:8000',                    region_name='us-east-1'                )                assert backend.endpoint_url == 'http://localhost:8000'    def test_get_client_credentials(self):        table_creation_path = \            'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'        with patch('boto3.client') as mock_boto_client, \                patch(table_creation_path):                backend = DynamoDBBackend(                    app=self.app,                    url='dynamodb://key:secret@test'                )                client = backend._get_client()                assert client is backend.client                mock_boto_client.assert_called_once_with(                    'dynamodb',                    aws_access_key_id='key',                    aws_secret_access_key='secret',                    region_name='test'                )                assert backend.aws_region == 'test'    def test_get_or_create_table_not_exists(self):        self.backend._client = MagicMock()        mock_create_table = self.backend._client.create_table = MagicMock()        mock_describe_table = self.backend._client.describe_table = \            MagicMock()        mock_describe_table.return_value = {            'Table': {                'TableStatus': 'ACTIVE'            }        }        self.backend._get_or_create_table()        mock_create_table.assert_called_once_with(            **self.backend._get_table_schema()        )    def test_get_or_create_table_already_exists(self):        from botocore.exceptions import ClientError        self.backend._client = MagicMock()        mock_create_table = self.backend._client.create_table = MagicMock()        client_error = ClientError(            {                'Error': {                    'Code': 'ResourceInUseException',                    'Message': 'Table already exists: {}'.format(                        self.backend.table_name                    )                }            },            'CreateTable'        )        mock_create_table.side_effect = client_error        mock_describe_table = self.backend._client.describe_table = \            MagicMock()        mock_describe_table.return_value = {            'Table': {                'TableStatus': 'ACTIVE'            }        }        self.backend._get_or_create_table()        mock_describe_table.assert_called_once_with(            TableName=self.backend.table_name        )    def test_wait_for_table_status(self):        self.backend._client = MagicMock()        mock_describe_table = self.backend._client.describe_table = \            MagicMock()        mock_describe_table.side_effect = [            {'Table': {                'TableStatus': 'CREATING'            }},            {'Table': {                'TableStatus': 'SOME_STATE'            }}        ]        self.backend._wait_for_table_status(expected='SOME_STATE')        assert mock_describe_table.call_count == 2    def test_prepare_get_request(self):        expected = {            'TableName': u'celery',            'Key': {u'id': {u'S': u'abcdef'}}        }        assert self.backend._prepare_get_request('abcdef') == expected    def test_prepare_put_request(self):        expected = {            'TableName': u'celery',            'Item': {                u'id': {u'S': u'abcdef'},                u'result': {u'B': u'val'},                u'timestamp': {                    u'N': str(Decimal(self._static_timestamp))                }            }        }        with patch('celery.backends.dynamodb.time', self._mock_time):            result = self.backend._prepare_put_request('abcdef', 'val')        assert result == expected    def test_item_to_dict(self):        boto_response = {            'Item': {                'id': {                    'S': sentinel.key                },                'result': {                    'B': sentinel.value                },                'timestamp': {                    'N': Decimal(1)                }            }        }        converted = self.backend._item_to_dict(boto_response)        assert converted == {            'id': sentinel.key,            'result': sentinel.value,            'timestamp': Decimal(1)        }    def test_get(self):        self.backend._client = Mock(name='_client')        self.backend._client.get_item = MagicMock()        assert self.backend.get('1f3fab') is None        self.backend.client.get_item.assert_called_once_with(            Key={u'id': {u'S': u'1f3fab'}},            TableName='celery'        )    def _mock_time(self):        return self._static_timestamp    def test_set(self):        self.backend._client = MagicMock()        self.backend._client.put_item = MagicMock()        # should return None        with patch('celery.backends.dynamodb.time', self._mock_time):            assert self.backend.set(sentinel.key, sentinel.value) is None        assert self.backend._client.put_item.call_count == 1        _, call_kwargs = self.backend._client.put_item.call_args        expected_kwargs = dict(            Item={                u'timestamp': {u'N': str(self._static_timestamp)},                u'id': {u'S': string(sentinel.key)},                u'result': {u'B': sentinel.value}            },            TableName='celery'        )        assert call_kwargs['Item'] == expected_kwargs['Item']        assert call_kwargs['TableName'] == 'celery'    def test_delete(self):        self.backend._client = Mock(name='_client')        mocked_delete = self.backend._client.delete = Mock('client.delete')        mocked_delete.return_value = None        # should return None        assert self.backend.delete('1f3fab') is None        self.backend.client.delete_item.assert_called_once_with(            Key={u'id': {u'S': u'1f3fab'}},            TableName='celery'        )    def test_backend_by_url(self, url='dynamodb://'):        from celery.app import backends        from celery.backends.dynamodb import DynamoDBBackend        backend, url_ = backends.by_url(url, self.app.loader)        assert backend is DynamoDBBackend        assert url_ == url    def test_backend_params_by_url(self):        self.app.conf.result_backend = \            'dynamodb://@us-east-1/celery_results?read=10&write=20'        assert self.backend.aws_region == 'us-east-1'        assert self.backend.table_name == 'celery_results'        assert self.backend.read_capacity_units == 10        assert self.backend.write_capacity_units == 20        assert self.backend.endpoint_url is None
 |