test_dynamodb.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. from decimal import Decimal
  4. import pytest
  5. from case import MagicMock, Mock, patch, sentinel, skip
  6. from celery.backends import dynamodb as module
  7. from celery.backends.dynamodb import DynamoDBBackend
  8. from celery.exceptions import ImproperlyConfigured
  9. from celery.five import string
  10. @skip.unless_module('boto3')
  11. class test_DynamoDBBackend:
  12. def setup(self):
  13. self._static_timestamp = Decimal(1483425566.52) # noqa
  14. self.app.conf.result_backend = 'dynamodb://'
  15. @property
  16. def backend(self):
  17. """:rtype: DynamoDBBackend"""
  18. return self.app.backend
  19. def test_init_no_boto3(self):
  20. prev, module.boto3 = module.boto3, None
  21. try:
  22. with pytest.raises(ImproperlyConfigured):
  23. DynamoDBBackend(app=self.app)
  24. finally:
  25. module.boto3 = prev
  26. def test_init_aws_credentials(self):
  27. with pytest.raises(ImproperlyConfigured):
  28. DynamoDBBackend(
  29. app=self.app,
  30. url='dynamodb://a:@'
  31. )
  32. def test_get_client_explicit_endpoint(self):
  33. table_creation_path = \
  34. 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
  35. with patch('boto3.client') as mock_boto_client, \
  36. patch(table_creation_path):
  37. self.app.conf.dynamodb_endpoint_url = 'http://my.domain.com:666'
  38. backend = DynamoDBBackend(
  39. app=self.app,
  40. url='dynamodb://@us-east-1'
  41. )
  42. client = backend._get_client()
  43. assert backend.client is client
  44. mock_boto_client.assert_called_once_with(
  45. 'dynamodb',
  46. endpoint_url='http://my.domain.com:666',
  47. region_name='us-east-1'
  48. )
  49. assert backend.endpoint_url == 'http://my.domain.com:666'
  50. def test_get_client_local(self):
  51. table_creation_path = \
  52. 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
  53. with patch('boto3.client') as mock_boto_client, \
  54. patch(table_creation_path):
  55. backend = DynamoDBBackend(
  56. app=self.app,
  57. url='dynamodb://@localhost:8000'
  58. )
  59. client = backend._get_client()
  60. assert backend.client is client
  61. mock_boto_client.assert_called_once_with(
  62. 'dynamodb',
  63. endpoint_url='http://localhost:8000',
  64. region_name='us-east-1'
  65. )
  66. assert backend.endpoint_url == 'http://localhost:8000'
  67. def test_get_client_credentials(self):
  68. table_creation_path = \
  69. 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table'
  70. with patch('boto3.client') as mock_boto_client, \
  71. patch(table_creation_path):
  72. backend = DynamoDBBackend(
  73. app=self.app,
  74. url='dynamodb://key:secret@test'
  75. )
  76. client = backend._get_client()
  77. assert client is backend.client
  78. mock_boto_client.assert_called_once_with(
  79. 'dynamodb',
  80. aws_access_key_id='key',
  81. aws_secret_access_key='secret',
  82. region_name='test'
  83. )
  84. assert backend.aws_region == 'test'
  85. def test_get_or_create_table_not_exists(self):
  86. self.backend._client = MagicMock()
  87. mock_create_table = self.backend._client.create_table = MagicMock()
  88. mock_describe_table = self.backend._client.describe_table = \
  89. MagicMock()
  90. mock_describe_table.return_value = {
  91. 'Table': {
  92. 'TableStatus': 'ACTIVE'
  93. }
  94. }
  95. self.backend._get_or_create_table()
  96. mock_create_table.assert_called_once_with(
  97. **self.backend._get_table_schema()
  98. )
  99. def test_get_or_create_table_already_exists(self):
  100. from botocore.exceptions import ClientError
  101. self.backend._client = MagicMock()
  102. mock_create_table = self.backend._client.create_table = MagicMock()
  103. client_error = ClientError(
  104. {
  105. 'Error': {
  106. 'Code': 'ResourceInUseException',
  107. 'Message': 'Table already exists: {}'.format(
  108. self.backend.table_name
  109. )
  110. }
  111. },
  112. 'CreateTable'
  113. )
  114. mock_create_table.side_effect = client_error
  115. mock_describe_table = self.backend._client.describe_table = \
  116. MagicMock()
  117. mock_describe_table.return_value = {
  118. 'Table': {
  119. 'TableStatus': 'ACTIVE'
  120. }
  121. }
  122. self.backend._get_or_create_table()
  123. mock_describe_table.assert_called_once_with(
  124. TableName=self.backend.table_name
  125. )
  126. def test_wait_for_table_status(self):
  127. self.backend._client = MagicMock()
  128. mock_describe_table = self.backend._client.describe_table = \
  129. MagicMock()
  130. mock_describe_table.side_effect = [
  131. {'Table': {
  132. 'TableStatus': 'CREATING'
  133. }},
  134. {'Table': {
  135. 'TableStatus': 'SOME_STATE'
  136. }}
  137. ]
  138. self.backend._wait_for_table_status(expected='SOME_STATE')
  139. assert mock_describe_table.call_count == 2
  140. def test_prepare_get_request(self):
  141. expected = {
  142. 'TableName': u'celery',
  143. 'Key': {u'id': {u'S': u'abcdef'}}
  144. }
  145. assert self.backend._prepare_get_request('abcdef') == expected
  146. def test_prepare_put_request(self):
  147. expected = {
  148. 'TableName': u'celery',
  149. 'Item': {
  150. u'id': {u'S': u'abcdef'},
  151. u'result': {u'B': u'val'},
  152. u'timestamp': {
  153. u'N': str(Decimal(self._static_timestamp))
  154. }
  155. }
  156. }
  157. with patch('celery.backends.dynamodb.time', self._mock_time):
  158. result = self.backend._prepare_put_request('abcdef', 'val')
  159. assert result == expected
  160. def test_item_to_dict(self):
  161. boto_response = {
  162. 'Item': {
  163. 'id': {
  164. 'S': sentinel.key
  165. },
  166. 'result': {
  167. 'B': sentinel.value
  168. },
  169. 'timestamp': {
  170. 'N': Decimal(1)
  171. }
  172. }
  173. }
  174. converted = self.backend._item_to_dict(boto_response)
  175. assert converted == {
  176. 'id': sentinel.key,
  177. 'result': sentinel.value,
  178. 'timestamp': Decimal(1)
  179. }
  180. def test_get(self):
  181. self.backend._client = Mock(name='_client')
  182. self.backend._client.get_item = MagicMock()
  183. assert self.backend.get('1f3fab') is None
  184. self.backend.client.get_item.assert_called_once_with(
  185. Key={u'id': {u'S': u'1f3fab'}},
  186. TableName='celery'
  187. )
  188. def _mock_time(self):
  189. return self._static_timestamp
  190. def test_set(self):
  191. self.backend._client = MagicMock()
  192. self.backend._client.put_item = MagicMock()
  193. # should return None
  194. with patch('celery.backends.dynamodb.time', self._mock_time):
  195. assert self.backend.set(sentinel.key, sentinel.value) is None
  196. assert self.backend._client.put_item.call_count == 1
  197. _, call_kwargs = self.backend._client.put_item.call_args
  198. expected_kwargs = {
  199. 'Item': {
  200. u'timestamp': {u'N': str(self._static_timestamp)},
  201. u'id': {u'S': string(sentinel.key)},
  202. u'result': {u'B': sentinel.value}
  203. },
  204. 'TableName': 'celery'
  205. }
  206. assert call_kwargs['Item'] == expected_kwargs['Item']
  207. assert call_kwargs['TableName'] == 'celery'
  208. def test_delete(self):
  209. self.backend._client = Mock(name='_client')
  210. mocked_delete = self.backend._client.delete = Mock('client.delete')
  211. mocked_delete.return_value = None
  212. # should return None
  213. assert self.backend.delete('1f3fab') is None
  214. self.backend.client.delete_item.assert_called_once_with(
  215. Key={u'id': {u'S': u'1f3fab'}},
  216. TableName='celery'
  217. )
  218. def test_backend_by_url(self, url='dynamodb://'):
  219. from celery.app import backends
  220. from celery.backends.dynamodb import DynamoDBBackend
  221. backend, url_ = backends.by_url(url, self.app.loader)
  222. assert backend is DynamoDBBackend
  223. assert url_ == url
  224. def test_backend_params_by_url(self):
  225. self.app.conf.result_backend = \
  226. 'dynamodb://@us-east-1/celery_results?read=10&write=20'
  227. assert self.backend.aws_region == 'us-east-1'
  228. assert self.backend.table_name == 'celery_results'
  229. assert self.backend.read_capacity_units == 10
  230. assert self.backend.write_capacity_units == 20
  231. assert self.backend.endpoint_url is None