test_mongodb.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import datetime
  4. from pickle import loads, dumps
  5. from case import ANY, MagicMock, Mock, mock, patch, sentinel, skip
  6. from kombu.exceptions import EncodeError
  7. from celery import uuid
  8. from celery import states
  9. from celery.backends.mongodb import InvalidDocument, MongoBackend
  10. from celery.exceptions import ImproperlyConfigured
  11. COLLECTION = 'taskmeta_celery'
  12. TASK_ID = uuid()
  13. MONGODB_HOST = 'localhost'
  14. MONGODB_PORT = 27017
  15. MONGODB_USER = 'mongo'
  16. MONGODB_PASSWORD = '1234'
  17. MONGODB_DATABASE = 'testing'
  18. MONGODB_COLLECTION = 'collection1'
  19. MONGODB_GROUP_COLLECTION = 'group_collection1'
  20. @skip.unless_module('pymongo')
  21. class test_MongoBackend:
  22. default_url = 'mongodb://uuuu:pwpw@hostname.dom/database'
  23. replica_set_url = (
  24. 'mongodb://uuuu:pwpw@hostname.dom,'
  25. 'hostname.dom/database?replicaSet=rs'
  26. )
  27. sanitized_default_url = 'mongodb://uuuu:**@hostname.dom/database'
  28. sanitized_replica_set_url = (
  29. 'mongodb://uuuu:**@hostname.dom/,'
  30. 'hostname.dom/database?replicaSet=rs'
  31. )
  32. def setup(self):
  33. self.patching('celery.backends.mongodb.MongoBackend.encode')
  34. self.patching('celery.backends.mongodb.MongoBackend.decode')
  35. self.patching('celery.backends.mongodb.Binary')
  36. self.patching('datetime.datetime')
  37. self.backend = MongoBackend(app=self.app, url=self.default_url)
  38. def test_init_no_mongodb(self, patching):
  39. patching('celery.backends.mongodb.pymongo', None)
  40. with pytest.raises(ImproperlyConfigured):
  41. MongoBackend(app=self.app)
  42. def test_init_no_settings(self):
  43. self.app.conf.mongodb_backend_settings = []
  44. with pytest.raises(ImproperlyConfigured):
  45. MongoBackend(app=self.app)
  46. def test_init_settings_is_None(self):
  47. self.app.conf.mongodb_backend_settings = None
  48. MongoBackend(app=self.app)
  49. def test_init_with_settings(self):
  50. self.app.conf.mongodb_backend_settings = None
  51. # empty settings
  52. mb = MongoBackend(app=self.app)
  53. # uri
  54. uri = 'mongodb://localhost:27017'
  55. mb = MongoBackend(app=self.app, url=uri)
  56. assert mb.mongo_host == ['localhost:27017']
  57. assert mb.options == mb._prepare_client_options()
  58. assert mb.database_name == 'celery'
  59. # uri with database name
  60. uri = 'mongodb://localhost:27017/celerydb'
  61. mb = MongoBackend(app=self.app, url=uri)
  62. assert mb.database_name == 'celerydb'
  63. # uri with user, password, database name, replica set
  64. uri = ('mongodb://'
  65. 'celeryuser:celerypassword@'
  66. 'mongo1.example.com:27017,'
  67. 'mongo2.example.com:27017,'
  68. 'mongo3.example.com:27017/'
  69. 'celerydatabase?replicaSet=rs0')
  70. mb = MongoBackend(app=self.app, url=uri)
  71. assert mb.mongo_host == [
  72. 'mongo1.example.com:27017',
  73. 'mongo2.example.com:27017',
  74. 'mongo3.example.com:27017',
  75. ]
  76. assert mb.options == dict(
  77. mb._prepare_client_options(),
  78. replicaset='rs0',
  79. )
  80. assert mb.user == 'celeryuser'
  81. assert mb.password == 'celerypassword'
  82. assert mb.database_name == 'celerydatabase'
  83. # same uri, change some parameters in backend settings
  84. self.app.conf.mongodb_backend_settings = {
  85. 'replicaset': 'rs1',
  86. 'user': 'backenduser',
  87. 'database': 'another_db',
  88. 'options': {
  89. 'socketKeepAlive': True,
  90. },
  91. }
  92. mb = MongoBackend(app=self.app, url=uri)
  93. assert mb.mongo_host == [
  94. 'mongo1.example.com:27017',
  95. 'mongo2.example.com:27017',
  96. 'mongo3.example.com:27017',
  97. ]
  98. assert mb.options == dict(
  99. mb._prepare_client_options(),
  100. replicaset='rs1',
  101. socketKeepAlive=True,
  102. )
  103. assert mb.user == 'backenduser'
  104. assert mb.password == 'celerypassword'
  105. assert mb.database_name == 'another_db'
  106. mb = MongoBackend(app=self.app, url='mongodb://')
  107. @pytest.mark.usefixtures('depends_on_current_app')
  108. def test_reduce(self):
  109. x = MongoBackend(app=self.app)
  110. assert loads(dumps(x))
  111. def test_get_connection_connection_exists(self):
  112. with patch('pymongo.MongoClient') as mock_Connection:
  113. self.backend._connection = sentinel._connection
  114. connection = self.backend._get_connection()
  115. assert sentinel._connection == connection
  116. mock_Connection.assert_not_called()
  117. def test_get_connection_no_connection_host(self):
  118. with patch('pymongo.MongoClient') as mock_Connection:
  119. self.backend._connection = None
  120. self.backend.host = MONGODB_HOST
  121. self.backend.port = MONGODB_PORT
  122. mock_Connection.return_value = sentinel.connection
  123. connection = self.backend._get_connection()
  124. mock_Connection.assert_called_once_with(
  125. host='mongodb://localhost:27017',
  126. **self.backend._prepare_client_options()
  127. )
  128. assert sentinel.connection == connection
  129. def test_get_connection_no_connection_mongodb_uri(self):
  130. with patch('pymongo.MongoClient') as mock_Connection:
  131. mongodb_uri = 'mongodb://%s:%d' % (MONGODB_HOST, MONGODB_PORT)
  132. self.backend._connection = None
  133. self.backend.host = mongodb_uri
  134. mock_Connection.return_value = sentinel.connection
  135. connection = self.backend._get_connection()
  136. mock_Connection.assert_called_once_with(
  137. host=mongodb_uri, **self.backend._prepare_client_options()
  138. )
  139. assert sentinel.connection == connection
  140. @patch('celery.backends.mongodb.MongoBackend._get_connection')
  141. def test_get_database_no_existing(self, mock_get_connection):
  142. # Should really check for combinations of these two, to be complete.
  143. self.backend.user = MONGODB_USER
  144. self.backend.password = MONGODB_PASSWORD
  145. mock_database = Mock()
  146. mock_connection = MagicMock(spec=['__getitem__'])
  147. mock_connection.__getitem__.return_value = mock_database
  148. mock_get_connection.return_value = mock_connection
  149. database = self.backend.database
  150. assert database is mock_database
  151. assert self.backend.__dict__['database'] is mock_database
  152. mock_database.authenticate.assert_called_once_with(
  153. MONGODB_USER, MONGODB_PASSWORD)
  154. @patch('celery.backends.mongodb.MongoBackend._get_connection')
  155. def test_get_database_no_existing_no_auth(self, mock_get_connection):
  156. # Should really check for combinations of these two, to be complete.
  157. self.backend.user = None
  158. self.backend.password = None
  159. mock_database = Mock()
  160. mock_connection = MagicMock(spec=['__getitem__'])
  161. mock_connection.__getitem__.return_value = mock_database
  162. mock_get_connection.return_value = mock_connection
  163. database = self.backend.database
  164. assert database is mock_database
  165. mock_database.authenticate.assert_not_called()
  166. assert self.backend.__dict__['database'] is mock_database
  167. @patch('celery.backends.mongodb.MongoBackend._get_database')
  168. def test_store_result(self, mock_get_database):
  169. self.backend.taskmeta_collection = MONGODB_COLLECTION
  170. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  171. mock_collection = Mock()
  172. mock_get_database.return_value = mock_database
  173. mock_database.__getitem__.return_value = mock_collection
  174. ret_val = self.backend._store_result(
  175. sentinel.task_id, sentinel.result, sentinel.status)
  176. mock_get_database.assert_called_once_with()
  177. mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
  178. mock_collection.save.assert_called_once_with(ANY)
  179. assert sentinel.result == ret_val
  180. mock_collection.save.side_effect = InvalidDocument()
  181. with pytest.raises(EncodeError):
  182. self.backend._store_result(
  183. sentinel.task_id, sentinel.result, sentinel.status)
  184. @patch('celery.backends.mongodb.MongoBackend._get_database')
  185. def test_get_task_meta_for(self, mock_get_database):
  186. self.backend.taskmeta_collection = MONGODB_COLLECTION
  187. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  188. mock_collection = Mock()
  189. mock_collection.find_one.return_value = MagicMock()
  190. mock_get_database.return_value = mock_database
  191. mock_database.__getitem__.return_value = mock_collection
  192. ret_val = self.backend._get_task_meta_for(sentinel.task_id)
  193. mock_get_database.assert_called_once_with()
  194. mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
  195. assert list(sorted([
  196. 'status', 'task_id', 'date_done',
  197. 'traceback', 'result', 'children',
  198. ])) == list(sorted(ret_val.keys()))
  199. @patch('celery.backends.mongodb.MongoBackend._get_database')
  200. def test_get_task_meta_for_no_result(self, mock_get_database):
  201. self.backend.taskmeta_collection = MONGODB_COLLECTION
  202. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  203. mock_collection = Mock()
  204. mock_collection.find_one.return_value = None
  205. mock_get_database.return_value = mock_database
  206. mock_database.__getitem__.return_value = mock_collection
  207. ret_val = self.backend._get_task_meta_for(sentinel.task_id)
  208. mock_get_database.assert_called_once_with()
  209. mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
  210. assert {'status': states.PENDING, 'result': None} == ret_val
  211. @patch('celery.backends.mongodb.MongoBackend._get_database')
  212. def test_save_group(self, mock_get_database):
  213. self.backend.groupmeta_collection = MONGODB_GROUP_COLLECTION
  214. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  215. mock_collection = Mock()
  216. mock_get_database.return_value = mock_database
  217. mock_database.__getitem__.return_value = mock_collection
  218. res = [self.app.AsyncResult(i) for i in range(3)]
  219. ret_val = self.backend._save_group(
  220. sentinel.taskset_id, res,
  221. )
  222. mock_get_database.assert_called_once_with()
  223. mock_database.__getitem__.assert_called_once_with(
  224. MONGODB_GROUP_COLLECTION,
  225. )
  226. mock_collection.save.assert_called_once_with(ANY)
  227. assert res == ret_val
  228. @patch('celery.backends.mongodb.MongoBackend._get_database')
  229. def test_restore_group(self, mock_get_database):
  230. self.backend.groupmeta_collection = MONGODB_GROUP_COLLECTION
  231. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  232. mock_collection = Mock()
  233. mock_collection.find_one.return_value = {
  234. '_id': sentinel.taskset_id,
  235. 'result': [uuid(), uuid()],
  236. 'date_done': 1,
  237. }
  238. self.backend.decode.side_effect = lambda r: r
  239. mock_get_database.return_value = mock_database
  240. mock_database.__getitem__.return_value = mock_collection
  241. ret_val = self.backend._restore_group(sentinel.taskset_id)
  242. mock_get_database.assert_called_once_with()
  243. mock_collection.find_one.assert_called_once_with(
  244. {'_id': sentinel.taskset_id})
  245. assert (sorted(['date_done', 'result', 'task_id']) ==
  246. sorted(list(ret_val.keys())))
  247. mock_collection.find_one.return_value = None
  248. self.backend._restore_group(sentinel.taskset_id)
  249. @patch('celery.backends.mongodb.MongoBackend._get_database')
  250. def test_delete_group(self, mock_get_database):
  251. self.backend.taskmeta_collection = MONGODB_COLLECTION
  252. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  253. mock_collection = Mock()
  254. mock_get_database.return_value = mock_database
  255. mock_database.__getitem__.return_value = mock_collection
  256. self.backend._delete_group(sentinel.taskset_id)
  257. mock_get_database.assert_called_once_with()
  258. mock_collection.remove.assert_called_once_with(
  259. {'_id': sentinel.taskset_id})
  260. @patch('celery.backends.mongodb.MongoBackend._get_database')
  261. def test_forget(self, mock_get_database):
  262. self.backend.taskmeta_collection = MONGODB_COLLECTION
  263. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  264. mock_collection = Mock()
  265. mock_get_database.return_value = mock_database
  266. mock_database.__getitem__.return_value = mock_collection
  267. self.backend._forget(sentinel.task_id)
  268. mock_get_database.assert_called_once_with()
  269. mock_database.__getitem__.assert_called_once_with(
  270. MONGODB_COLLECTION)
  271. mock_collection.remove.assert_called_once_with(
  272. {'_id': sentinel.task_id})
  273. @patch('celery.backends.mongodb.MongoBackend._get_database')
  274. def test_cleanup(self, mock_get_database):
  275. self.backend.taskmeta_collection = MONGODB_COLLECTION
  276. self.backend.groupmeta_collection = MONGODB_GROUP_COLLECTION
  277. mock_database = Mock(spec=['__getitem__', '__setitem__'],
  278. name='MD')
  279. self.backend.collections = mock_collection = Mock()
  280. mock_get_database.return_value = mock_database
  281. mock_database.__getitem__ = Mock(name='MD.__getitem__')
  282. mock_database.__getitem__.return_value = mock_collection
  283. self.backend.app.now = datetime.datetime.utcnow
  284. self.backend.cleanup()
  285. mock_get_database.assert_called_once_with()
  286. mock_collection.remove.assert_called()
  287. def test_get_database_authfailure(self):
  288. x = MongoBackend(app=self.app)
  289. x._get_connection = Mock()
  290. conn = x._get_connection.return_value = {}
  291. db = conn[x.database_name] = Mock()
  292. db.authenticate.return_value = False
  293. x.user = 'jerry'
  294. x.password = 'cere4l'
  295. with pytest.raises(ImproperlyConfigured):
  296. x._get_database()
  297. db.authenticate.assert_called_with('jerry', 'cere4l')
  298. def test_prepare_client_options(self):
  299. with patch('pymongo.version_tuple', new=(3, 0, 3)):
  300. options = self.backend._prepare_client_options()
  301. assert options == {
  302. 'maxPoolSize': self.backend.max_pool_size
  303. }
  304. def test_as_uri_include_password(self):
  305. assert self.backend.as_uri(True) == self.default_url
  306. def test_as_uri_exclude_password(self):
  307. assert self.backend.as_uri() == self.sanitized_default_url
  308. def test_as_uri_include_password_replica_set(self):
  309. backend = MongoBackend(app=self.app, url=self.replica_set_url)
  310. assert backend.as_uri(True) == self.replica_set_url
  311. def test_as_uri_exclude_password_replica_set(self):
  312. backend = MongoBackend(app=self.app, url=self.replica_set_url)
  313. assert backend.as_uri() == self.sanitized_replica_set_url
  314. def test_regression_worker_startup_info(self):
  315. self.app.conf.result_backend = (
  316. 'mongodb://user:password@host0.com:43437,host1.com:43437'
  317. '/work4us?replicaSet=rs&ssl=true'
  318. )
  319. worker = self.app.Worker()
  320. with mock.stdouts():
  321. worker.on_start()
  322. assert worker.startup_info()
  323. @skip.unless_module('pymongo')
  324. class test_MongoBackend_no_mock:
  325. def test_encode_decode(self, app):
  326. backend = MongoBackend(app=app)
  327. data = {'foo': 1}
  328. assert backend.decode(backend.encode(data))
  329. backend.serializer = 'bson'
  330. assert backend.encode(data) == data
  331. assert backend.decode(data) == data
  332. def test_de(self, app):
  333. backend = MongoBackend(app=app)
  334. data = {'foo': 1}
  335. assert backend.encode(data)
  336. backend.serializer = 'bson'
  337. assert backend.encode(data) == data