test_redis.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. from __future__ import absolute_import, unicode_literals
  2. import random
  3. import ssl
  4. from contextlib import contextmanager
  5. from datetime import timedelta
  6. from pickle import dumps, loads
  7. import pytest
  8. from case import ANY, ContextMock, Mock, call, mock, patch, skip
  9. from celery import signature, states, uuid
  10. from celery.canvas import Signature
  11. from celery.exceptions import (ChordError, CPendingDeprecationWarning,
  12. ImproperlyConfigured)
  13. from celery.utils.collections import AttributeDict
  14. def raise_on_second_call(mock, exc, *retval):
  15. def on_first_call(*args, **kwargs):
  16. mock.side_effect = exc
  17. return mock.return_value
  18. mock.side_effect = on_first_call
  19. if retval:
  20. mock.return_value, = retval
  21. class Connection(object):
  22. connected = True
  23. def disconnect(self):
  24. self.connected = False
  25. class Pipeline(object):
  26. def __init__(self, client):
  27. self.client = client
  28. self.steps = []
  29. def __getattr__(self, attr):
  30. def add_step(*args, **kwargs):
  31. self.steps.append((getattr(self.client, attr), args, kwargs))
  32. return self
  33. return add_step
  34. def __enter__(self):
  35. return self
  36. def __exit__(self, type, value, traceback):
  37. pass
  38. def execute(self):
  39. return [step(*a, **kw) for step, a, kw in self.steps]
  40. class Redis(mock.MockCallbacks):
  41. Connection = Connection
  42. Pipeline = Pipeline
  43. def __init__(self, host=None, port=None, db=None, password=None, **kw):
  44. self.host = host
  45. self.port = port
  46. self.db = db
  47. self.password = password
  48. self.keyspace = {}
  49. self.expiry = {}
  50. self.connection = self.Connection()
  51. def get(self, key):
  52. return self.keyspace.get(key)
  53. def setex(self, key, expires, value):
  54. self.set(key, value)
  55. self.expire(key, expires)
  56. def set(self, key, value):
  57. self.keyspace[key] = value
  58. def expire(self, key, expires):
  59. self.expiry[key] = expires
  60. return expires
  61. def delete(self, key):
  62. return bool(self.keyspace.pop(key, None))
  63. def pipeline(self):
  64. return self.Pipeline(self)
  65. def _get_list(self, key):
  66. try:
  67. return self.keyspace[key]
  68. except KeyError:
  69. l = self.keyspace[key] = []
  70. return l
  71. def rpush(self, key, value):
  72. self._get_list(key).append(value)
  73. def lrange(self, key, start, stop):
  74. return self._get_list(key)[start:stop]
  75. def llen(self, key):
  76. return len(self.keyspace.get(key) or [])
  77. class Sentinel(mock.MockCallbacks):
  78. def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
  79. **connection_kwargs):
  80. self.sentinel_kwargs = sentinel_kwargs
  81. self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
  82. for hostname, port in sentinels]
  83. self.min_other_sentinels = min_other_sentinels
  84. self.connection_kwargs = connection_kwargs
  85. def master_for(self, service_name, redis_class):
  86. return random.choice(self.sentinels)
  87. class redis(object):
  88. StrictRedis = Redis
  89. class ConnectionPool(object):
  90. def __init__(self, **kwargs):
  91. pass
  92. class UnixDomainSocketConnection(object):
  93. def __init__(self, **kwargs):
  94. pass
  95. class sentinel(object):
  96. Sentinel = Sentinel
  97. class test_RedisResultConsumer:
  98. def get_backend(self):
  99. from celery.backends.redis import RedisBackend
  100. class _RedisBackend(RedisBackend):
  101. redis = redis
  102. return _RedisBackend(app=self.app)
  103. def get_consumer(self):
  104. return self.get_backend().result_consumer
  105. @patch('celery.backends.asynchronous.BaseResultConsumer.on_after_fork')
  106. def test_on_after_fork(self, parent_method):
  107. consumer = self.get_consumer()
  108. consumer.start('none')
  109. consumer.on_after_fork()
  110. parent_method.assert_called_once()
  111. consumer.backend.client.connection_pool.reset.assert_called_once()
  112. consumer._pubsub.close.assert_called_once()
  113. # PubSub instance not initialized - exception would be raised
  114. # when calling .close()
  115. consumer._pubsub = None
  116. parent_method.reset_mock()
  117. consumer.backend.client.connection_pool.reset.reset_mock()
  118. consumer.on_after_fork()
  119. parent_method.assert_called_once()
  120. consumer.backend.client.connection_pool.reset.assert_called_once()
  121. # Continues on KeyError
  122. consumer._pubsub = Mock()
  123. consumer._pubsub.close = Mock(side_effect=KeyError)
  124. parent_method.reset_mock()
  125. consumer.backend.client.connection_pool.reset.reset_mock()
  126. consumer.on_after_fork()
  127. parent_method.assert_called_once()
  128. @patch('celery.backends.redis.ResultConsumer.cancel_for')
  129. @patch('celery.backends.asynchronous.BaseResultConsumer.on_state_change')
  130. def test_on_state_change(self, parent_method, cancel_for):
  131. consumer = self.get_consumer()
  132. meta = {'task_id': 'testing', 'status': states.SUCCESS}
  133. message = 'hello'
  134. consumer.on_state_change(meta, message)
  135. parent_method.assert_called_once_with(meta, message)
  136. cancel_for.assert_called_once_with(meta['task_id'])
  137. # Does not call cancel_for for other states
  138. meta = {'task_id': 'testing2', 'status': states.PENDING}
  139. parent_method.reset_mock()
  140. cancel_for.reset_mock()
  141. consumer.on_state_change(meta, message)
  142. parent_method.assert_called_once_with(meta, message)
  143. cancel_for.assert_not_called()
  144. class test_RedisBackend:
  145. def get_backend(self):
  146. from celery.backends.redis import RedisBackend
  147. class _RedisBackend(RedisBackend):
  148. redis = redis
  149. return _RedisBackend
  150. def get_E_LOST(self):
  151. from celery.backends.redis import E_LOST
  152. return E_LOST
  153. def setup(self):
  154. self.Backend = self.get_backend()
  155. self.E_LOST = self.get_E_LOST()
  156. self.b = self.Backend(app=self.app)
  157. @pytest.mark.usefixtures('depends_on_current_app')
  158. @skip.unless_module('redis')
  159. def test_reduce(self):
  160. from celery.backends.redis import RedisBackend
  161. x = RedisBackend(app=self.app)
  162. assert loads(dumps(x))
  163. def test_no_redis(self):
  164. self.Backend.redis = None
  165. with pytest.raises(ImproperlyConfigured):
  166. self.Backend(app=self.app)
  167. def test_url(self):
  168. self.app.conf.redis_socket_timeout = 30.0
  169. self.app.conf.redis_socket_connect_timeout = 100.0
  170. x = self.Backend(
  171. 'redis://:bosco@vandelay.com:123//1', app=self.app,
  172. )
  173. assert x.connparams
  174. assert x.connparams['host'] == 'vandelay.com'
  175. assert x.connparams['db'] == 1
  176. assert x.connparams['port'] == 123
  177. assert x.connparams['password'] == 'bosco'
  178. assert x.connparams['socket_timeout'] == 30.0
  179. assert x.connparams['socket_connect_timeout'] == 100.0
  180. @skip.unless_module('redis')
  181. def test_timeouts_in_url_coerced(self):
  182. x = self.Backend(
  183. ('redis://:bosco@vandelay.com:123//1?'
  184. 'socket_timeout=30&socket_connect_timeout=100'),
  185. app=self.app,
  186. )
  187. assert x.connparams
  188. assert x.connparams['host'] == 'vandelay.com'
  189. assert x.connparams['db'] == 1
  190. assert x.connparams['port'] == 123
  191. assert x.connparams['password'] == 'bosco'
  192. assert x.connparams['socket_timeout'] == 30
  193. assert x.connparams['socket_connect_timeout'] == 100
  194. def test_socket_url(self):
  195. self.app.conf.redis_socket_timeout = 30.0
  196. self.app.conf.redis_socket_connect_timeout = 100.0
  197. x = self.Backend(
  198. 'socket:///tmp/redis.sock?virtual_host=/3', app=self.app,
  199. )
  200. assert x.connparams
  201. assert x.connparams['path'] == '/tmp/redis.sock'
  202. assert (x.connparams['connection_class'] is
  203. redis.UnixDomainSocketConnection)
  204. assert 'host' not in x.connparams
  205. assert 'port' not in x.connparams
  206. assert x.connparams['socket_timeout'] == 30.0
  207. assert 'socket_connect_timeout' not in x.connparams
  208. assert x.connparams['db'] == 3
  209. @skip.unless_module('redis')
  210. def test_backend_ssl(self):
  211. self.app.conf.redis_backend_use_ssl = {
  212. 'ssl_cert_reqs': ssl.CERT_REQUIRED,
  213. 'ssl_ca_certs': '/path/to/ca.crt',
  214. 'ssl_certfile': '/path/to/client.crt',
  215. 'ssl_keyfile': '/path/to/client.key',
  216. }
  217. self.app.conf.redis_socket_timeout = 30.0
  218. self.app.conf.redis_socket_connect_timeout = 100.0
  219. x = self.Backend(
  220. 'redis://:bosco@vandelay.com:123//1', app=self.app,
  221. )
  222. assert x.connparams
  223. assert x.connparams['host'] == 'vandelay.com'
  224. assert x.connparams['db'] == 1
  225. assert x.connparams['port'] == 123
  226. assert x.connparams['password'] == 'bosco'
  227. assert x.connparams['socket_timeout'] == 30.0
  228. assert x.connparams['socket_connect_timeout'] == 100.0
  229. assert x.connparams['ssl_cert_reqs'] == ssl.CERT_REQUIRED
  230. assert x.connparams['ssl_ca_certs'] == '/path/to/ca.crt'
  231. assert x.connparams['ssl_certfile'] == '/path/to/client.crt'
  232. assert x.connparams['ssl_keyfile'] == '/path/to/client.key'
  233. from redis.connection import SSLConnection
  234. assert x.connparams['connection_class'] is SSLConnection
  235. @skip.unless_module('redis')
  236. def test_backend_ssl_url(self):
  237. self.app.conf.redis_socket_timeout = 30.0
  238. self.app.conf.redis_socket_connect_timeout = 100.0
  239. x = self.Backend(
  240. 'rediss://:bosco@vandelay.com:123//1?ssl_cert_reqs=CERT_REQUIRED',
  241. app=self.app,
  242. )
  243. assert x.connparams
  244. assert x.connparams['host'] == 'vandelay.com'
  245. assert x.connparams['db'] == 1
  246. assert x.connparams['port'] == 123
  247. assert x.connparams['password'] == 'bosco'
  248. assert x.connparams['socket_timeout'] == 30.0
  249. assert x.connparams['socket_connect_timeout'] == 100.0
  250. assert x.connparams['ssl_cert_reqs'] == ssl.CERT_REQUIRED
  251. from redis.connection import SSLConnection
  252. assert x.connparams['connection_class'] is SSLConnection
  253. @skip.unless_module('redis')
  254. def test_backend_ssl_url_options(self):
  255. x = self.Backend(
  256. (
  257. 'rediss://:bosco@vandelay.com:123//1?ssl_cert_reqs=CERT_NONE'
  258. '&ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem'
  259. '&ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem'
  260. '&ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem'
  261. ),
  262. app=self.app,
  263. )
  264. assert x.connparams
  265. assert x.connparams['host'] == 'vandelay.com'
  266. assert x.connparams['db'] == 1
  267. assert x.connparams['port'] == 123
  268. assert x.connparams['password'] == 'bosco'
  269. assert x.connparams['ssl_cert_reqs'] == ssl.CERT_NONE
  270. assert x.connparams['ssl_ca_certs'] == '/var/ssl/myca.pem'
  271. assert x.connparams['ssl_certfile'] == '/var/ssl/redis-server-cert.pem'
  272. assert x.connparams['ssl_keyfile'] == '/var/ssl/private/worker-key.pem'
  273. @skip.unless_module('redis')
  274. def test_backend_ssl_url_cert_none(self):
  275. x = self.Backend(
  276. 'rediss://:bosco@vandelay.com:123//1?ssl_cert_reqs=CERT_OPTIONAL',
  277. app=self.app,
  278. )
  279. assert x.connparams
  280. assert x.connparams['host'] == 'vandelay.com'
  281. assert x.connparams['db'] == 1
  282. assert x.connparams['port'] == 123
  283. assert x.connparams['ssl_cert_reqs'] == ssl.CERT_OPTIONAL
  284. from redis.connection import SSLConnection
  285. assert x.connparams['connection_class'] is SSLConnection
  286. @skip.unless_module('redis')
  287. @pytest.mark.parametrize("uri", [
  288. 'rediss://:bosco@vandelay.com:123//1?ssl_cert_reqs=CERT_KITTY_CATS',
  289. 'rediss://:bosco@vandelay.com:123//1'
  290. ])
  291. def test_backend_ssl_url_invalid(self, uri):
  292. with pytest.raises(ValueError):
  293. self.Backend(
  294. uri,
  295. app=self.app,
  296. )
  297. def test_compat_propertie(self):
  298. x = self.Backend(
  299. 'redis://:bosco@vandelay.com:123//1', app=self.app,
  300. )
  301. with pytest.warns(CPendingDeprecationWarning):
  302. assert x.host == 'vandelay.com'
  303. with pytest.warns(CPendingDeprecationWarning):
  304. assert x.db == 1
  305. with pytest.warns(CPendingDeprecationWarning):
  306. assert x.port == 123
  307. with pytest.warns(CPendingDeprecationWarning):
  308. assert x.password == 'bosco'
  309. def test_conf_raises_KeyError(self):
  310. self.app.conf = AttributeDict({
  311. 'result_serializer': 'json',
  312. 'result_cache_max': 1,
  313. 'result_expires': None,
  314. 'accept_content': ['json'],
  315. })
  316. self.Backend(app=self.app)
  317. @patch('celery.backends.redis.logger')
  318. def test_on_connection_error(self, logger):
  319. intervals = iter([10, 20, 30])
  320. exc = KeyError()
  321. assert self.b.on_connection_error(None, exc, intervals, 1) == 10
  322. logger.error.assert_called_with(
  323. self.E_LOST, 1, 'Inf', 'in 10.00 seconds')
  324. assert self.b.on_connection_error(10, exc, intervals, 2) == 20
  325. logger.error.assert_called_with(self.E_LOST, 2, 10, 'in 20.00 seconds')
  326. assert self.b.on_connection_error(10, exc, intervals, 3) == 30
  327. logger.error.assert_called_with(self.E_LOST, 3, 10, 'in 30.00 seconds')
  328. def test_incr(self):
  329. self.b.client = Mock(name='client')
  330. self.b.incr('foo')
  331. self.b.client.incr.assert_called_with('foo')
  332. def test_expire(self):
  333. self.b.client = Mock(name='client')
  334. self.b.expire('foo', 300)
  335. self.b.client.expire.assert_called_with('foo', 300)
  336. def test_apply_chord(self, unlock='celery.chord_unlock'):
  337. self.app.tasks[unlock] = Mock()
  338. header_result = self.app.GroupResult(
  339. uuid(),
  340. [self.app.AsyncResult(x) for x in range(3)],
  341. )
  342. self.b.apply_chord(header_result, None)
  343. assert self.app.tasks[unlock].apply_async.call_count == 0
  344. def test_unpack_chord_result(self):
  345. self.b.exception_to_python = Mock(name='etp')
  346. decode = Mock(name='decode')
  347. exc = KeyError()
  348. tup = decode.return_value = (1, 'id1', states.FAILURE, exc)
  349. with pytest.raises(ChordError):
  350. self.b._unpack_chord_result(tup, decode)
  351. decode.assert_called_with(tup)
  352. self.b.exception_to_python.assert_called_with(exc)
  353. exc = ValueError()
  354. tup = decode.return_value = (2, 'id2', states.RETRY, exc)
  355. ret = self.b._unpack_chord_result(tup, decode)
  356. self.b.exception_to_python.assert_called_with(exc)
  357. assert ret is self.b.exception_to_python()
  358. def test_on_chord_part_return_no_gid_or_tid(self):
  359. request = Mock(name='request')
  360. request.id = request.group = None
  361. assert self.b.on_chord_part_return(request, 'SUCCESS', 10) is None
  362. def test_ConnectionPool(self):
  363. self.b.redis = Mock(name='redis')
  364. assert self.b._ConnectionPool is None
  365. assert self.b.ConnectionPool is self.b.redis.ConnectionPool
  366. assert self.b.ConnectionPool is self.b.redis.ConnectionPool
  367. def test_expires_defaults_to_config(self):
  368. self.app.conf.result_expires = 10
  369. b = self.Backend(expires=None, app=self.app)
  370. assert b.expires == 10
  371. def test_expires_is_int(self):
  372. b = self.Backend(expires=48, app=self.app)
  373. assert b.expires == 48
  374. def test_add_to_chord(self):
  375. b = self.Backend('redis://', app=self.app)
  376. gid = uuid()
  377. b.add_to_chord(gid, 'sig')
  378. b.client.incr.assert_called_with(b.get_key_for_group(gid, '.t'), 1)
  379. def test_expires_is_None(self):
  380. b = self.Backend(expires=None, app=self.app)
  381. assert b.expires == self.app.conf.result_expires.total_seconds()
  382. def test_expires_is_timedelta(self):
  383. b = self.Backend(expires=timedelta(minutes=1), app=self.app)
  384. assert b.expires == 60
  385. def test_mget(self):
  386. assert self.b.mget(['a', 'b', 'c'])
  387. self.b.client.mget.assert_called_with(['a', 'b', 'c'])
  388. def test_set_no_expire(self):
  389. self.b.expires = None
  390. self.b.set('foo', 'bar')
  391. def create_task(self):
  392. tid = uuid()
  393. task = Mock(name='task-{0}'.format(tid))
  394. task.name = 'foobarbaz'
  395. self.app.tasks['foobarbaz'] = task
  396. task.request.chord = signature(task)
  397. task.request.id = tid
  398. task.request.chord['chord_size'] = 10
  399. task.request.group = 'group_id'
  400. return task
  401. @patch('celery.result.GroupResult.restore')
  402. def test_on_chord_part_return(self, restore):
  403. tasks = [self.create_task() for i in range(10)]
  404. for i in range(10):
  405. self.b.on_chord_part_return(tasks[i].request, states.SUCCESS, i)
  406. assert self.b.client.rpush.call_count
  407. self.b.client.rpush.reset_mock()
  408. assert self.b.client.lrange.call_count
  409. jkey = self.b.get_key_for_group('group_id', '.j')
  410. tkey = self.b.get_key_for_group('group_id', '.t')
  411. self.b.client.delete.assert_has_calls([call(jkey), call(tkey)])
  412. self.b.client.expire.assert_has_calls([
  413. call(jkey, 86400), call(tkey, 86400),
  414. ])
  415. def test_on_chord_part_return__success(self):
  416. with self.chord_context(2) as (_, request, callback):
  417. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  418. callback.delay.assert_not_called()
  419. self.b.on_chord_part_return(request, states.SUCCESS, 20)
  420. callback.delay.assert_called_with([10, 20])
  421. def test_on_chord_part_return__callback_raises(self):
  422. with self.chord_context(1) as (_, request, callback):
  423. callback.delay.side_effect = KeyError(10)
  424. task = self.app._tasks['add'] = Mock(name='add_task')
  425. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  426. task.backend.fail_from_current_stack.assert_called_with(
  427. callback.id, exc=ANY,
  428. )
  429. def test_on_chord_part_return__ChordError(self):
  430. with self.chord_context(1) as (_, request, callback):
  431. self.b.client.pipeline = ContextMock()
  432. raise_on_second_call(self.b.client.pipeline, ChordError())
  433. self.b.client.pipeline.return_value.rpush().llen().get().expire(
  434. ).expire().execute.return_value = (1, 1, 0, 4, 5)
  435. task = self.app._tasks['add'] = Mock(name='add_task')
  436. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  437. task.backend.fail_from_current_stack.assert_called_with(
  438. callback.id, exc=ANY,
  439. )
  440. def test_on_chord_part_return__other_error(self):
  441. with self.chord_context(1) as (_, request, callback):
  442. self.b.client.pipeline = ContextMock()
  443. raise_on_second_call(self.b.client.pipeline, RuntimeError())
  444. self.b.client.pipeline.return_value.rpush().llen().get().expire(
  445. ).expire().execute.return_value = (1, 1, 0, 4, 5)
  446. task = self.app._tasks['add'] = Mock(name='add_task')
  447. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  448. task.backend.fail_from_current_stack.assert_called_with(
  449. callback.id, exc=ANY,
  450. )
  451. @contextmanager
  452. def chord_context(self, size=1):
  453. with patch('celery.backends.redis.maybe_signature') as ms:
  454. tasks = [self.create_task() for i in range(size)]
  455. request = Mock(name='request')
  456. request.id = 'id1'
  457. request.group = 'gid1'
  458. callback = ms.return_value = Signature('add')
  459. callback.id = 'id1'
  460. callback['chord_size'] = size
  461. callback.delay = Mock(name='callback.delay')
  462. yield tasks, request, callback
  463. def test_process_cleanup(self):
  464. self.b.process_cleanup()
  465. def test_get_set_forget(self):
  466. tid = uuid()
  467. self.b.store_result(tid, 42, states.SUCCESS)
  468. assert self.b.get_state(tid) == states.SUCCESS
  469. assert self.b.get_result(tid) == 42
  470. self.b.forget(tid)
  471. assert self.b.get_state(tid) == states.PENDING
  472. def test_set_expires(self):
  473. self.b = self.Backend(expires=512, app=self.app)
  474. tid = uuid()
  475. key = self.b.get_key_for_task(tid)
  476. self.b.store_result(tid, 42, states.SUCCESS)
  477. self.b.client.expire.assert_called_with(
  478. key, 512,
  479. )
  480. class test_SentinelBackend:
  481. def get_backend(self):
  482. from celery.backends.redis import SentinelBackend
  483. class _SentinelBackend(SentinelBackend):
  484. redis = redis
  485. sentinel = sentinel
  486. return _SentinelBackend
  487. def get_E_LOST(self):
  488. from celery.backends.redis import E_LOST
  489. return E_LOST
  490. def setup(self):
  491. self.Backend = self.get_backend()
  492. self.E_LOST = self.get_E_LOST()
  493. self.b = self.Backend(app=self.app)
  494. @pytest.mark.usefixtures('depends_on_current_app')
  495. @skip.unless_module('redis')
  496. def test_reduce(self):
  497. from celery.backends.redis import SentinelBackend
  498. x = SentinelBackend(app=self.app)
  499. assert loads(dumps(x))
  500. def test_no_redis(self):
  501. self.Backend.redis = None
  502. with pytest.raises(ImproperlyConfigured):
  503. self.Backend(app=self.app)
  504. def test_url(self):
  505. self.app.conf.redis_socket_timeout = 30.0
  506. self.app.conf.redis_socket_connect_timeout = 100.0
  507. x = self.Backend(
  508. 'sentinel://:test@github.com:123/1;'
  509. 'sentinel://:test@github.com:124/1',
  510. app=self.app,
  511. )
  512. assert x.connparams
  513. assert "host" not in x.connparams
  514. assert x.connparams['db'] == 1
  515. assert "port" not in x.connparams
  516. assert x.connparams['password'] == "test"
  517. assert len(x.connparams['hosts']) == 2
  518. expected_hosts = ["github.com", "github.com"]
  519. found_hosts = [cp['host'] for cp in x.connparams['hosts']]
  520. assert found_hosts == expected_hosts
  521. expected_ports = [123, 124]
  522. found_ports = [cp['port'] for cp in x.connparams['hosts']]
  523. assert found_ports == expected_ports
  524. expected_passwords = ["test", "test"]
  525. found_passwords = [cp['password'] for cp in x.connparams['hosts']]
  526. assert found_passwords == expected_passwords
  527. expected_dbs = [1, 1]
  528. found_dbs = [cp['db'] for cp in x.connparams['hosts']]
  529. assert found_dbs == expected_dbs
  530. def test_get_sentinel_instance(self):
  531. x = self.Backend(
  532. 'sentinel://:test@github.com:123/1;'
  533. 'sentinel://:test@github.com:124/1',
  534. app=self.app,
  535. )
  536. sentinel_instance = x._get_sentinel_instance(**x.connparams)
  537. assert sentinel_instance.sentinel_kwargs == {}
  538. assert sentinel_instance.connection_kwargs['db'] == 1
  539. assert sentinel_instance.connection_kwargs['password'] == "test"
  540. assert len(sentinel_instance.sentinels) == 2
  541. def test_get_pool(self):
  542. x = self.Backend(
  543. 'sentinel://:test@github.com:123/1;'
  544. 'sentinel://:test@github.com:124/1',
  545. app=self.app,
  546. )
  547. pool = x._get_pool(**x.connparams)
  548. assert pool