test_mongodb.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. from __future__ import absolute_import, unicode_literals
  2. import datetime
  3. from pickle import dumps, loads
  4. import pytest
  5. from case import ANY, MagicMock, Mock, mock, patch, sentinel, skip
  6. from kombu.exceptions import EncodeError
  7. from celery import states, uuid
  8. from celery.backends.mongodb import InvalidDocument, MongoBackend
  9. from celery.exceptions import ImproperlyConfigured
  10. COLLECTION = 'taskmeta_celery'
  11. TASK_ID = uuid()
  12. MONGODB_HOST = 'localhost'
  13. MONGODB_PORT = 27017
  14. MONGODB_USER = 'mongo'
  15. MONGODB_PASSWORD = '1234'
  16. MONGODB_DATABASE = 'testing'
  17. MONGODB_COLLECTION = 'collection1'
  18. MONGODB_GROUP_COLLECTION = 'group_collection1'
  19. @skip.unless_module('pymongo')
  20. class test_MongoBackend:
  21. default_url = 'mongodb://uuuu:pwpw@hostname.dom/database'
  22. replica_set_url = (
  23. 'mongodb://uuuu:pwpw@hostname.dom,'
  24. 'hostname.dom/database?replicaSet=rs'
  25. )
  26. sanitized_default_url = 'mongodb://uuuu:**@hostname.dom/database'
  27. sanitized_replica_set_url = (
  28. 'mongodb://uuuu:**@hostname.dom/,'
  29. 'hostname.dom/database?replicaSet=rs'
  30. )
  31. def setup(self):
  32. self.patching('celery.backends.mongodb.MongoBackend.encode')
  33. self.patching('celery.backends.mongodb.MongoBackend.decode')
  34. self.patching('celery.backends.mongodb.Binary')
  35. self.patching('datetime.datetime')
  36. self.backend = MongoBackend(app=self.app, url=self.default_url)
  37. def test_init_no_mongodb(self, patching):
  38. patching('celery.backends.mongodb.pymongo', None)
  39. with pytest.raises(ImproperlyConfigured):
  40. MongoBackend(app=self.app)
  41. def test_init_no_settings(self):
  42. self.app.conf.mongodb_backend_settings = []
  43. with pytest.raises(ImproperlyConfigured):
  44. MongoBackend(app=self.app)
  45. def test_init_settings_is_None(self):
  46. self.app.conf.mongodb_backend_settings = None
  47. MongoBackend(app=self.app)
  48. def test_init_with_settings(self):
  49. self.app.conf.mongodb_backend_settings = None
  50. # empty settings
  51. mb = MongoBackend(app=self.app)
  52. # uri
  53. uri = 'mongodb://localhost:27017'
  54. mb = MongoBackend(app=self.app, url=uri)
  55. assert mb.mongo_host == ['localhost:27017']
  56. assert mb.options == mb._prepare_client_options()
  57. assert mb.database_name == 'celery'
  58. # uri with database name
  59. uri = 'mongodb://localhost:27017/celerydb'
  60. mb = MongoBackend(app=self.app, url=uri)
  61. assert mb.database_name == 'celerydb'
  62. # uri with user, password, database name, replica set
  63. uri = ('mongodb://'
  64. 'celeryuser:celerypassword@'
  65. 'mongo1.example.com:27017,'
  66. 'mongo2.example.com:27017,'
  67. 'mongo3.example.com:27017/'
  68. 'celerydatabase?replicaSet=rs0')
  69. mb = MongoBackend(app=self.app, url=uri)
  70. assert mb.mongo_host == [
  71. 'mongo1.example.com:27017',
  72. 'mongo2.example.com:27017',
  73. 'mongo3.example.com:27017',
  74. ]
  75. assert mb.options == dict(
  76. mb._prepare_client_options(),
  77. replicaset='rs0',
  78. )
  79. assert mb.user == 'celeryuser'
  80. assert mb.password == 'celerypassword'
  81. assert mb.database_name == 'celerydatabase'
  82. # same uri, change some parameters in backend settings
  83. self.app.conf.mongodb_backend_settings = {
  84. 'replicaset': 'rs1',
  85. 'user': 'backenduser',
  86. 'database': 'another_db',
  87. 'options': {
  88. 'socketKeepAlive': True,
  89. },
  90. }
  91. mb = MongoBackend(app=self.app, url=uri)
  92. assert mb.mongo_host == [
  93. 'mongo1.example.com:27017',
  94. 'mongo2.example.com:27017',
  95. 'mongo3.example.com:27017',
  96. ]
  97. assert mb.options == dict(
  98. mb._prepare_client_options(),
  99. replicaset='rs1',
  100. socketKeepAlive=True,
  101. )
  102. assert mb.user == 'backenduser'
  103. assert mb.password == 'celerypassword'
  104. assert mb.database_name == 'another_db'
  105. mb = MongoBackend(app=self.app, url='mongodb://')
  106. @pytest.mark.usefixtures('depends_on_current_app')
  107. def test_reduce(self):
  108. x = MongoBackend(app=self.app)
  109. assert loads(dumps(x))
  110. def test_get_connection_connection_exists(self):
  111. with patch('pymongo.MongoClient') as mock_Connection:
  112. self.backend._connection = sentinel._connection
  113. connection = self.backend._get_connection()
  114. assert sentinel._connection == connection
  115. mock_Connection.assert_not_called()
  116. def test_get_connection_no_connection_host(self):
  117. with patch('pymongo.MongoClient') as mock_Connection:
  118. self.backend._connection = None
  119. self.backend.host = MONGODB_HOST
  120. self.backend.port = MONGODB_PORT
  121. mock_Connection.return_value = sentinel.connection
  122. connection = self.backend._get_connection()
  123. mock_Connection.assert_called_once_with(
  124. host='mongodb://localhost:27017',
  125. **self.backend._prepare_client_options()
  126. )
  127. assert sentinel.connection == connection
  128. def test_get_connection_no_connection_mongodb_uri(self):
  129. with patch('pymongo.MongoClient') as mock_Connection:
  130. mongodb_uri = 'mongodb://%s:%d' % (MONGODB_HOST, MONGODB_PORT)
  131. self.backend._connection = None
  132. self.backend.host = mongodb_uri
  133. mock_Connection.return_value = sentinel.connection
  134. connection = self.backend._get_connection()
  135. mock_Connection.assert_called_once_with(
  136. host=mongodb_uri, **self.backend._prepare_client_options()
  137. )
  138. assert sentinel.connection == connection
  139. @patch('celery.backends.mongodb.MongoBackend._get_connection')
  140. def test_get_database_no_existing(self, mock_get_connection):
  141. # Should really check for combinations of these two, to be complete.
  142. self.backend.user = MONGODB_USER
  143. self.backend.password = MONGODB_PASSWORD
  144. mock_database = Mock()
  145. mock_connection = MagicMock(spec=['__getitem__'])
  146. mock_connection.__getitem__.return_value = mock_database
  147. mock_get_connection.return_value = mock_connection
  148. database = self.backend.database
  149. assert database is mock_database
  150. assert self.backend.__dict__['database'] is mock_database
  151. mock_database.authenticate.assert_called_once_with(
  152. MONGODB_USER, MONGODB_PASSWORD)
  153. @patch('celery.backends.mongodb.MongoBackend._get_connection')
  154. def test_get_database_no_existing_no_auth(self, mock_get_connection):
  155. # Should really check for combinations of these two, to be complete.
  156. self.backend.user = None
  157. self.backend.password = None
  158. mock_database = Mock()
  159. mock_connection = MagicMock(spec=['__getitem__'])
  160. mock_connection.__getitem__.return_value = mock_database
  161. mock_get_connection.return_value = mock_connection
  162. database = self.backend.database
  163. assert database is mock_database
  164. mock_database.authenticate.assert_not_called()
  165. assert self.backend.__dict__['database'] is mock_database
  166. @patch('celery.backends.mongodb.MongoBackend._get_database')
  167. def test_store_result(self, mock_get_database):
  168. self.backend.taskmeta_collection = MONGODB_COLLECTION
  169. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  170. mock_collection = Mock()
  171. mock_get_database.return_value = mock_database
  172. mock_database.__getitem__.return_value = mock_collection
  173. ret_val = self.backend._store_result(
  174. sentinel.task_id, sentinel.result, sentinel.status)
  175. mock_get_database.assert_called_once_with()
  176. mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
  177. mock_collection.save.assert_called_once_with(ANY)
  178. assert sentinel.result == ret_val
  179. mock_collection.save.side_effect = InvalidDocument()
  180. with pytest.raises(EncodeError):
  181. self.backend._store_result(
  182. sentinel.task_id, sentinel.result, sentinel.status)
  183. @patch('celery.backends.mongodb.MongoBackend._get_database')
  184. def test_get_task_meta_for(self, mock_get_database):
  185. self.backend.taskmeta_collection = MONGODB_COLLECTION
  186. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  187. mock_collection = Mock()
  188. mock_collection.find_one.return_value = MagicMock()
  189. mock_get_database.return_value = mock_database
  190. mock_database.__getitem__.return_value = mock_collection
  191. ret_val = self.backend._get_task_meta_for(sentinel.task_id)
  192. mock_get_database.assert_called_once_with()
  193. mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
  194. assert list(sorted([
  195. 'status', 'task_id', 'date_done',
  196. 'traceback', 'result', 'children',
  197. ])) == list(sorted(ret_val.keys()))
  198. @patch('celery.backends.mongodb.MongoBackend._get_database')
  199. def test_get_task_meta_for_no_result(self, mock_get_database):
  200. self.backend.taskmeta_collection = MONGODB_COLLECTION
  201. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  202. mock_collection = Mock()
  203. mock_collection.find_one.return_value = None
  204. mock_get_database.return_value = mock_database
  205. mock_database.__getitem__.return_value = mock_collection
  206. ret_val = self.backend._get_task_meta_for(sentinel.task_id)
  207. mock_get_database.assert_called_once_with()
  208. mock_database.__getitem__.assert_called_once_with(MONGODB_COLLECTION)
  209. assert {'status': states.PENDING, 'result': None} == ret_val
  210. @patch('celery.backends.mongodb.MongoBackend._get_database')
  211. def test_save_group(self, mock_get_database):
  212. self.backend.groupmeta_collection = MONGODB_GROUP_COLLECTION
  213. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  214. mock_collection = Mock()
  215. mock_get_database.return_value = mock_database
  216. mock_database.__getitem__.return_value = mock_collection
  217. res = [self.app.AsyncResult(i) for i in range(3)]
  218. ret_val = self.backend._save_group(
  219. sentinel.taskset_id, res,
  220. )
  221. mock_get_database.assert_called_once_with()
  222. mock_database.__getitem__.assert_called_once_with(
  223. MONGODB_GROUP_COLLECTION,
  224. )
  225. mock_collection.save.assert_called_once_with(ANY)
  226. assert res == ret_val
  227. @patch('celery.backends.mongodb.MongoBackend._get_database')
  228. def test_restore_group(self, mock_get_database):
  229. self.backend.groupmeta_collection = MONGODB_GROUP_COLLECTION
  230. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  231. mock_collection = Mock()
  232. mock_collection.find_one.return_value = {
  233. '_id': sentinel.taskset_id,
  234. 'result': [uuid(), uuid()],
  235. 'date_done': 1,
  236. }
  237. self.backend.decode.side_effect = lambda r: r
  238. mock_get_database.return_value = mock_database
  239. mock_database.__getitem__.return_value = mock_collection
  240. ret_val = self.backend._restore_group(sentinel.taskset_id)
  241. mock_get_database.assert_called_once_with()
  242. mock_collection.find_one.assert_called_once_with(
  243. {'_id': sentinel.taskset_id})
  244. assert (sorted(['date_done', 'result', 'task_id']) ==
  245. sorted(list(ret_val.keys())))
  246. mock_collection.find_one.return_value = None
  247. self.backend._restore_group(sentinel.taskset_id)
  248. @patch('celery.backends.mongodb.MongoBackend._get_database')
  249. def test_delete_group(self, mock_get_database):
  250. self.backend.taskmeta_collection = MONGODB_COLLECTION
  251. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  252. mock_collection = Mock()
  253. mock_get_database.return_value = mock_database
  254. mock_database.__getitem__.return_value = mock_collection
  255. self.backend._delete_group(sentinel.taskset_id)
  256. mock_get_database.assert_called_once_with()
  257. mock_collection.remove.assert_called_once_with(
  258. {'_id': sentinel.taskset_id})
  259. @patch('celery.backends.mongodb.MongoBackend._get_database')
  260. def test_forget(self, mock_get_database):
  261. self.backend.taskmeta_collection = MONGODB_COLLECTION
  262. mock_database = MagicMock(spec=['__getitem__', '__setitem__'])
  263. mock_collection = Mock()
  264. mock_get_database.return_value = mock_database
  265. mock_database.__getitem__.return_value = mock_collection
  266. self.backend._forget(sentinel.task_id)
  267. mock_get_database.assert_called_once_with()
  268. mock_database.__getitem__.assert_called_once_with(
  269. MONGODB_COLLECTION)
  270. mock_collection.remove.assert_called_once_with(
  271. {'_id': sentinel.task_id})
  272. @patch('celery.backends.mongodb.MongoBackend._get_database')
  273. def test_cleanup(self, mock_get_database):
  274. self.backend.taskmeta_collection = MONGODB_COLLECTION
  275. self.backend.groupmeta_collection = MONGODB_GROUP_COLLECTION
  276. mock_database = Mock(spec=['__getitem__', '__setitem__'],
  277. name='MD')
  278. self.backend.collections = mock_collection = Mock()
  279. mock_get_database.return_value = mock_database
  280. mock_database.__getitem__ = Mock(name='MD.__getitem__')
  281. mock_database.__getitem__.return_value = mock_collection
  282. self.backend.app.now = datetime.datetime.utcnow
  283. self.backend.cleanup()
  284. mock_get_database.assert_called_once_with()
  285. mock_collection.remove.assert_called()
  286. def test_get_database_authfailure(self):
  287. x = MongoBackend(app=self.app)
  288. x._get_connection = Mock()
  289. conn = x._get_connection.return_value = {}
  290. db = conn[x.database_name] = Mock()
  291. db.authenticate.return_value = False
  292. x.user = 'jerry'
  293. x.password = 'cere4l'
  294. with pytest.raises(ImproperlyConfigured):
  295. x._get_database()
  296. db.authenticate.assert_called_with('jerry', 'cere4l')
  297. def test_prepare_client_options(self):
  298. with patch('pymongo.version_tuple', new=(3, 0, 3)):
  299. options = self.backend._prepare_client_options()
  300. assert options == {
  301. 'maxPoolSize': self.backend.max_pool_size
  302. }
  303. def test_as_uri_include_password(self):
  304. assert self.backend.as_uri(True) == self.default_url
  305. def test_as_uri_exclude_password(self):
  306. assert self.backend.as_uri() == self.sanitized_default_url
  307. def test_as_uri_include_password_replica_set(self):
  308. backend = MongoBackend(app=self.app, url=self.replica_set_url)
  309. assert backend.as_uri(True) == self.replica_set_url
  310. def test_as_uri_exclude_password_replica_set(self):
  311. backend = MongoBackend(app=self.app, url=self.replica_set_url)
  312. assert backend.as_uri() == self.sanitized_replica_set_url
  313. def test_regression_worker_startup_info(self):
  314. self.app.conf.result_backend = (
  315. 'mongodb://user:password@host0.com:43437,host1.com:43437'
  316. '/work4us?replicaSet=rs&ssl=true'
  317. )
  318. worker = self.app.Worker()
  319. with mock.stdouts():
  320. worker.on_start()
  321. assert worker.startup_info()
  322. @skip.unless_module('pymongo')
  323. class test_MongoBackend_no_mock:
  324. def test_encode_decode(self, app):
  325. backend = MongoBackend(app=app)
  326. data = {'foo': 1}
  327. assert backend.decode(backend.encode(data))
  328. backend.serializer = 'bson'
  329. assert backend.encode(data) == data
  330. assert backend.decode(data) == data
  331. def test_de(self, app):
  332. backend = MongoBackend(app=app)
  333. data = {'foo': 1}
  334. assert backend.encode(data)
  335. backend.serializer = 'bson'
  336. assert backend.encode(data) == data