test_dynamodb.py 8.2 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. from decimal import Decimal
  4. import pytest
  5. from case import MagicMock, Mock, patch, sentinel, skip
  6. from celery.backends import dynamodb as module
  7. from celery.backends.dynamodb import DynamoDBBackend
  8. from celery.exceptions import ImproperlyConfigured
  9. from celery.five import string
  10. @skip.unless_module('boto3')
  11. class test_DynamoDBBackend:
  12. def setup(self):
  13. self._static_timestamp = Decimal(1483425566.52) # noqa
  14. self.app.conf.result_backend = 'dynamodb://'
  15. @property
  16. def backend(self):
  17. """:rtype: DynamoDBBackend"""
  18. return self.app.backend
  19. def test_init_no_boto3(self):
  20. prev, module.boto3 = module.boto3, None
  21. try:
  22. with pytest.raises(ImproperlyConfigured):
  23. DynamoDBBackend(app=self.app)
  24. finally:
  25. module.boto3 = prev
  26. def test_init_aws_credentials(self):
  27. with pytest.raises(ImproperlyConfigured):
  28. DynamoDBBackend(
  29. app=self.app,
  30. url='dynamodb://a:@'
  31. )
  32. def test_get_client_local(self):
  33. table_creation_path = \
  34. 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
  35. with patch('boto3.client') as mock_boto_client, \
  36. patch(table_creation_path):
  37. backend = DynamoDBBackend(
  38. app=self.app,
  39. url='dynamodb://@localhost:8000'
  40. )
  41. client = backend._get_client()
  42. assert backend.client is client
  43. mock_boto_client.assert_called_once_with(
  44. 'dynamodb',
  45. endpoint_url='http://localhost:8000',
  46. region_name='us-east-1'
  47. )
  48. assert backend.endpoint_url == 'http://localhost:8000'
  49. def test_get_client_credentials(self):
  50. table_creation_path = \
  51. 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
  52. with patch('boto3.client') as mock_boto_client, \
  53. patch(table_creation_path):
  54. backend = DynamoDBBackend(
  55. app=self.app,
  56. url='dynamodb://key:secret@test'
  57. )
  58. client = backend._get_client()
  59. assert client is backend.client
  60. mock_boto_client.assert_called_once_with(
  61. 'dynamodb',
  62. aws_access_key_id='key',
  63. aws_secret_access_key='secret',
  64. region_name='test'
  65. )
  66. assert backend.aws_region == 'test'
  67. def test_get_or_create_table_not_exists(self):
  68. self.backend._client = MagicMock()
  69. mock_create_table = self.backend._client.create_table = MagicMock()
  70. mock_describe_table = self.backend._client.describe_table = \
  71. MagicMock()
  72. mock_describe_table.return_value = {
  73. 'Table': {
  74. 'TableStatus': 'ACTIVE'
  75. }
  76. }
  77. self.backend._get_or_create_table()
  78. mock_create_table.assert_called_once_with(
  79. **self.backend._get_table_schema()
  80. )
  81. def test_get_or_create_table_already_exists(self):
  82. from botocore.exceptions import ClientError
  83. self.backend._client = MagicMock()
  84. mock_create_table = self.backend._client.create_table = MagicMock()
  85. client_error = ClientError(
  86. {
  87. 'Error': {
  88. 'Code': 'ResourceInUseException',
  89. 'Message': 'Table already exists: {}'.format(
  90. self.backend.table_name
  91. )
  92. }
  93. },
  94. 'CreateTable'
  95. )
  96. mock_create_table.side_effect = client_error
  97. mock_describe_table = self.backend._client.describe_table = \
  98. MagicMock()
  99. mock_describe_table.return_value = {
  100. 'Table': {
  101. 'TableStatus': 'ACTIVE'
  102. }
  103. }
  104. self.backend._get_or_create_table()
  105. mock_describe_table.assert_called_once_with(
  106. TableName=self.backend.table_name
  107. )
  108. def test_wait_for_table_status(self):
  109. self.backend._client = MagicMock()
  110. mock_describe_table = self.backend._client.describe_table = \
  111. MagicMock()
  112. mock_describe_table.side_effect = [
  113. {'Table': {
  114. 'TableStatus': 'CREATING'
  115. }},
  116. {'Table': {
  117. 'TableStatus': 'SOME_STATE'
  118. }}
  119. ]
  120. self.backend._wait_for_table_status(expected='SOME_STATE')
  121. assert mock_describe_table.call_count == 2
  122. def test_prepare_get_request(self):
  123. expected = {
  124. 'TableName': u'celery',
  125. 'Key': {u'id': {u'S': u'abcdef'}}
  126. }
  127. assert self.backend._prepare_get_request('abcdef') == expected
  128. def test_prepare_put_request(self):
  129. expected = {
  130. 'TableName': u'celery',
  131. 'Item': {
  132. u'id': {u'S': u'abcdef'},
  133. u'result': {u'B': u'val'},
  134. u'timestamp': {
  135. u'N': str(Decimal(self._static_timestamp))
  136. }
  137. }
  138. }
  139. with patch('celery.backends.dynamodb.time', self._mock_time):
  140. result = self.backend._prepare_put_request('abcdef', 'val')
  141. assert result == expected
  142. def test_item_to_dict(self):
  143. boto_response = {
  144. 'Item': {
  145. 'id': {
  146. 'S': sentinel.key
  147. },
  148. 'result': {
  149. 'B': sentinel.value
  150. },
  151. 'timestamp': {
  152. 'N': Decimal(1)
  153. }
  154. }
  155. }
  156. converted = self.backend._item_to_dict(boto_response)
  157. assert converted == {
  158. 'id': sentinel.key,
  159. 'result': sentinel.value,
  160. 'timestamp': Decimal(1)
  161. }
  162. def test_get(self):
  163. self.backend._client = Mock(name='_client')
  164. self.backend._client.get_item = MagicMock()
  165. assert self.backend.get('1f3fab') is None
  166. self.backend.client.get_item.assert_called_once_with(
  167. Key={u'id': {u'S': u'1f3fab'}},
  168. TableName='celery'
  169. )
  170. def _mock_time(self):
  171. return self._static_timestamp
  172. def test_set(self):
  173. self.backend._client = MagicMock()
  174. self.backend._client.put_item = MagicMock()
  175. # should return None
  176. with patch('celery.backends.dynamodb.time', self._mock_time):
  177. assert self.backend.set(sentinel.key, sentinel.value) is None
  178. assert self.backend._client.put_item.call_count == 1
  179. _, call_kwargs = self.backend._client.put_item.call_args
  180. expected_kwargs = dict(
  181. Item={
  182. u'timestamp': {u'N': str(self._static_timestamp)},
  183. u'id': {u'S': string(sentinel.key)},
  184. u'result': {u'B': sentinel.value}
  185. },
  186. TableName='celery'
  187. )
  188. assert call_kwargs['Item'] == expected_kwargs['Item']
  189. assert call_kwargs['TableName'] == 'celery'
  190. def test_delete(self):
  191. self.backend._client = Mock(name='_client')
  192. mocked_delete = self.backend._client.delete = Mock('client.delete')
  193. mocked_delete.return_value = None
  194. # should return None
  195. assert self.backend.delete('1f3fab') is None
  196. self.backend.client.delete_item.assert_called_once_with(
  197. Key={u'id': {u'S': u'1f3fab'}},
  198. TableName='celery'
  199. )
  200. def test_backend_by_url(self, url='dynamodb://'):
  201. from celery.app import backends
  202. from celery.backends.dynamodb import DynamoDBBackend
  203. backend, url_ = backends.by_url(url, self.app.loader)
  204. assert backend is DynamoDBBackend
  205. assert url_ == url
  206. def test_backend_params_by_url(self):
  207. self.app.conf.result_backend = \
  208. 'dynamodb://@us-east-1/celery_results?read=10&write=20'
  209. assert self.backend.aws_region == 'us-east-1'
  210. assert self.backend.table_name == 'celery_results'
  211. assert self.backend.read_capacity_units == 10
  212. assert self.backend.write_capacity_units == 20
  213. assert self.backend.endpoint_url is None