123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import, unicode_literals
- from decimal import Decimal
- import pytest
- from case import MagicMock, Mock, patch, sentinel, skip
- from celery.backends import dynamodb as module
- from celery.backends.dynamodb import DynamoDBBackend
- from celery.exceptions import ImproperlyConfigured
- from celery.five import string
- @skip.unless_module('boto3')
- class test_DynamoDBBackend:
- def setup(self):
- self._static_timestamp = Decimal(1483425566.52) # noqa
- self.app.conf.result_backend = 'dynamodb://'
- @property
- def backend(self):
- """:rtype: DynamoDBBackend"""
- return self.app.backend
- def test_init_no_boto3(self):
- prev, module.boto3 = module.boto3, None
- try:
- with pytest.raises(ImproperlyConfigured):
- DynamoDBBackend(app=self.app)
- finally:
- module.boto3 = prev
- def test_init_aws_credentials(self):
- with pytest.raises(ImproperlyConfigured):
- DynamoDBBackend(
- app=self.app,
- url='dynamodb://a:@'
- )
- def test_get_client_explicit_endpoint(self):
- table_creation_path = \
- 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
- with patch('boto3.client') as mock_boto_client, \
- patch(table_creation_path):
- self.app.conf.dynamodb_endpoint_url = 'http://my.domain.com:666'
- backend = DynamoDBBackend(
- app=self.app,
- url='dynamodb://@us-east-1'
- )
- client = backend._get_client()
- assert backend.client is client
- mock_boto_client.assert_called_once_with(
- 'dynamodb',
- endpoint_url='http://my.domain.com:666',
- region_name='us-east-1'
- )
- assert backend.endpoint_url == 'http://my.domain.com:666'
- def test_get_client_local(self):
- table_creation_path = \
- 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
- with patch('boto3.client') as mock_boto_client, \
- patch(table_creation_path):
- backend = DynamoDBBackend(
- app=self.app,
- url='dynamodb://@localhost:8000'
- )
- client = backend._get_client()
- assert backend.client is client
- mock_boto_client.assert_called_once_with(
- 'dynamodb',
- endpoint_url='http://localhost:8000',
- region_name='us-east-1'
- )
- assert backend.endpoint_url == 'http://localhost:8000'
- def test_get_client_credentials(self):
- table_creation_path = \
- 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
- with patch('boto3.client') as mock_boto_client, \
- patch(table_creation_path):
- backend = DynamoDBBackend(
- app=self.app,
- url='dynamodb://key:secret@test'
- )
- client = backend._get_client()
- assert client is backend.client
- mock_boto_client.assert_called_once_with(
- 'dynamodb',
- aws_access_key_id='key',
- aws_secret_access_key='secret',
- region_name='test'
- )
- assert backend.aws_region == 'test'
- def test_get_or_create_table_not_exists(self):
- self.backend._client = MagicMock()
- mock_create_table = self.backend._client.create_table = MagicMock()
- mock_describe_table = self.backend._client.describe_table = \
- MagicMock()
- mock_describe_table.return_value = {
- 'Table': {
- 'TableStatus': 'ACTIVE'
- }
- }
- self.backend._get_or_create_table()
- mock_create_table.assert_called_once_with(
- **self.backend._get_table_schema()
- )
- def test_get_or_create_table_already_exists(self):
- from botocore.exceptions import ClientError
- self.backend._client = MagicMock()
- mock_create_table = self.backend._client.create_table = MagicMock()
- client_error = ClientError(
- {
- 'Error': {
- 'Code': 'ResourceInUseException',
- 'Message': 'Table already exists: {}'.format(
- self.backend.table_name
- )
- }
- },
- 'CreateTable'
- )
- mock_create_table.side_effect = client_error
- mock_describe_table = self.backend._client.describe_table = \
- MagicMock()
- mock_describe_table.return_value = {
- 'Table': {
- 'TableStatus': 'ACTIVE'
- }
- }
- self.backend._get_or_create_table()
- mock_describe_table.assert_called_once_with(
- TableName=self.backend.table_name
- )
- def test_wait_for_table_status(self):
- self.backend._client = MagicMock()
- mock_describe_table = self.backend._client.describe_table = \
- MagicMock()
- mock_describe_table.side_effect = [
- {'Table': {
- 'TableStatus': 'CREATING'
- }},
- {'Table': {
- 'TableStatus': 'SOME_STATE'
- }}
- ]
- self.backend._wait_for_table_status(expected='SOME_STATE')
- assert mock_describe_table.call_count == 2
- def test_prepare_get_request(self):
- expected = {
- 'TableName': u'celery',
- 'Key': {u'id': {u'S': u'abcdef'}}
- }
- assert self.backend._prepare_get_request('abcdef') == expected
- def test_prepare_put_request(self):
- expected = {
- 'TableName': u'celery',
- 'Item': {
- u'id': {u'S': u'abcdef'},
- u'result': {u'B': u'val'},
- u'timestamp': {
- u'N': str(Decimal(self._static_timestamp))
- }
- }
- }
- with patch('celery.backends.dynamodb.time', self._mock_time):
- result = self.backend._prepare_put_request('abcdef', 'val')
- assert result == expected
- def test_item_to_dict(self):
- boto_response = {
- 'Item': {
- 'id': {
- 'S': sentinel.key
- },
- 'result': {
- 'B': sentinel.value
- },
- 'timestamp': {
- 'N': Decimal(1)
- }
- }
- }
- converted = self.backend._item_to_dict(boto_response)
- assert converted == {
- 'id': sentinel.key,
- 'result': sentinel.value,
- 'timestamp': Decimal(1)
- }
- def test_get(self):
- self.backend._client = Mock(name='_client')
- self.backend._client.get_item = MagicMock()
- assert self.backend.get('1f3fab') is None
- self.backend.client.get_item.assert_called_once_with(
- Key={u'id': {u'S': u'1f3fab'}},
- TableName='celery'
- )
- def _mock_time(self):
- return self._static_timestamp
- def test_set(self):
- self.backend._client = MagicMock()
- self.backend._client.put_item = MagicMock()
- # should return None
- with patch('celery.backends.dynamodb.time', self._mock_time):
- assert self.backend.set(sentinel.key, sentinel.value) is None
- assert self.backend._client.put_item.call_count == 1
- _, call_kwargs = self.backend._client.put_item.call_args
- expected_kwargs = {
- 'Item': {
- u'timestamp': {u'N': str(self._static_timestamp)},
- u'id': {u'S': string(sentinel.key)},
- u'result': {u'B': sentinel.value}
- },
- 'TableName': 'celery'
- }
- assert call_kwargs['Item'] == expected_kwargs['Item']
- assert call_kwargs['TableName'] == 'celery'
- def test_delete(self):
- self.backend._client = Mock(name='_client')
- mocked_delete = self.backend._client.delete = Mock('client.delete')
- mocked_delete.return_value = None
- # should return None
- assert self.backend.delete('1f3fab') is None
- self.backend.client.delete_item.assert_called_once_with(
- Key={u'id': {u'S': u'1f3fab'}},
- TableName='celery'
- )
- def test_backend_by_url(self, url='dynamodb://'):
- from celery.app import backends
- from celery.backends.dynamodb import DynamoDBBackend
- backend, url_ = backends.by_url(url, self.app.loader)
- assert backend is DynamoDBBackend
- assert url_ == url
- def test_backend_params_by_url(self):
- self.app.conf.result_backend = \
- 'dynamodb://@us-east-1/celery_results?read=10&write=20'
- assert self.backend.aws_region == 'us-east-1'
- assert self.backend.table_name == 'celery_results'
- assert self.backend.read_capacity_units == 10
- assert self.backend.write_capacity_units == 20
- assert self.backend.endpoint_url is None
|