test_base.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. import pytest
  6. from case import ANY, Mock, call, patch, skip
  7. from celery import chord, group, states, uuid
  8. from celery.app.task import Context
  9. from celery.backends.base import (BaseBackend, DisabledBackend,
  10. KeyValueStoreBackend, _nulldict)
  11. from celery.exceptions import ChordError, TimeoutError
  12. from celery.five import bytes_if_py2, items, range
  13. from celery.result import result_from_tuple
  14. from celery.utils import serialization
  15. from celery.utils.functional import pass1
  16. from celery.utils.serialization import UnpickleableExceptionWrapper
  17. from celery.utils.serialization import find_pickleable_exception as fnpe
  18. from celery.utils.serialization import get_pickleable_exception as gpe
  19. from celery.utils.serialization import subclass_exception
  20. class wrapobject(object):
  21. def __init__(self, *args, **kwargs):
  22. self.args = args
  23. if sys.version_info[0] == 3 or getattr(sys, 'pypy_version_info', None):
  24. Oldstyle = None
  25. else:
  26. Oldstyle = types.ClassType(bytes_if_py2('Oldstyle'), (), {})
  27. Unpickleable = subclass_exception(
  28. bytes_if_py2('Unpickleable'), KeyError, 'foo.module',
  29. )
  30. Impossible = subclass_exception(
  31. bytes_if_py2('Impossible'), object, 'foo.module',
  32. )
  33. Lookalike = subclass_exception(
  34. bytes_if_py2('Lookalike'), wrapobject, 'foo.module',
  35. )
  36. class test_nulldict:
  37. def test_nulldict(self):
  38. x = _nulldict()
  39. x['foo'] = 1
  40. x.update(foo=1, bar=2)
  41. x.setdefault('foo', 3)
  42. class test_serialization:
  43. def test_create_exception_cls(self):
  44. assert serialization.create_exception_cls('FooError', 'm')
  45. assert serialization.create_exception_cls('FooError', 'm', KeyError)
  46. class test_BaseBackend_interface:
  47. def setup(self):
  48. self.b = BaseBackend(self.app)
  49. @self.app.task(shared=False)
  50. def callback(result):
  51. pass
  52. self.callback = callback
  53. def test__forget(self):
  54. with pytest.raises(NotImplementedError):
  55. self.b._forget('SOMExx-N0Nex1stant-IDxx-')
  56. def test_forget(self):
  57. with pytest.raises(NotImplementedError):
  58. self.b.forget('SOMExx-N0nex1stant-IDxx-')
  59. def test_on_chord_part_return(self):
  60. self.b.on_chord_part_return(None, None, None)
  61. def test_apply_chord(self, unlock='celery.chord_unlock'):
  62. self.app.tasks[unlock] = Mock()
  63. header_result = self.app.GroupResult(
  64. uuid(),
  65. [self.app.AsyncResult(x) for x in range(3)],
  66. )
  67. self.b.apply_chord(header_result, self.callback.s())
  68. assert self.app.tasks[unlock].apply_async.call_count
  69. def test_chord_unlock_queue(self, unlock='celery.chord_unlock'):
  70. self.app.tasks[unlock] = Mock()
  71. header_result = self.app.GroupResult(
  72. uuid(),
  73. [self.app.AsyncResult(x) for x in range(3)],
  74. )
  75. body = self.callback.s()
  76. self.b.apply_chord(header_result, body)
  77. called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
  78. assert called_kwargs['queue'] is None
  79. self.b.apply_chord(header_result, body.set(queue='test_queue'))
  80. called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
  81. assert called_kwargs['queue'] == 'test_queue'
  82. @self.app.task(shared=False, queue='test_queue_two')
  83. def callback_queue(result):
  84. pass
  85. self.b.apply_chord(header_result, callback_queue.s())
  86. called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
  87. assert called_kwargs['queue'] == 'test_queue_two'
  88. class test_exception_pickle:
  89. @skip.if_python3(reason='does not support old style classes')
  90. @skip.if_pypy()
  91. def test_oldstyle(self):
  92. assert fnpe(Oldstyle())
  93. def test_BaseException(self):
  94. assert fnpe(Exception()) is None
  95. def test_get_pickleable_exception(self):
  96. exc = Exception('foo')
  97. assert gpe(exc) == exc
  98. def test_unpickleable(self):
  99. assert isinstance(fnpe(Unpickleable()), KeyError)
  100. assert fnpe(Impossible()) is None
  101. class test_prepare_exception:
  102. def setup(self):
  103. self.b = BaseBackend(self.app)
  104. def test_unpickleable(self):
  105. self.b.serializer = 'pickle'
  106. x = self.b.prepare_exception(Unpickleable(1, 2, 'foo'))
  107. assert isinstance(x, KeyError)
  108. y = self.b.exception_to_python(x)
  109. assert isinstance(y, KeyError)
  110. def test_json_exception_arguments(self):
  111. self.b.serializer = 'json'
  112. x = self.b.prepare_exception(Exception(object))
  113. assert x == {
  114. 'exc_message': serialization.ensure_serializable(
  115. (object,), self.b.encode),
  116. 'exc_type': Exception.__name__,
  117. 'exc_module': Exception.__module__}
  118. y = self.b.exception_to_python(x)
  119. assert isinstance(y, Exception)
  120. def test_impossible(self):
  121. self.b.serializer = 'pickle'
  122. x = self.b.prepare_exception(Impossible())
  123. assert isinstance(x, UnpickleableExceptionWrapper)
  124. assert str(x)
  125. y = self.b.exception_to_python(x)
  126. assert y.__class__.__name__ == 'Impossible'
  127. if sys.version_info < (2, 5):
  128. assert y.__class__.__module__
  129. else:
  130. assert y.__class__.__module__ == 'foo.module'
  131. def test_regular(self):
  132. self.b.serializer = 'pickle'
  133. x = self.b.prepare_exception(KeyError('baz'))
  134. assert isinstance(x, KeyError)
  135. y = self.b.exception_to_python(x)
  136. assert isinstance(y, KeyError)
  137. def test_unicode_message(self):
  138. message = u'\u03ac'
  139. x = self.b.prepare_exception(Exception(message))
  140. assert x == {'exc_message': (message,),
  141. 'exc_type': Exception.__name__,
  142. 'exc_module': Exception.__module__}
  143. class KVBackend(KeyValueStoreBackend):
  144. mget_returns_dict = False
  145. def __init__(self, app, *args, **kwargs):
  146. self.db = {}
  147. super(KVBackend, self).__init__(app)
  148. def get(self, key):
  149. return self.db.get(key)
  150. def set(self, key, value):
  151. self.db[key] = value
  152. def mget(self, keys):
  153. if self.mget_returns_dict:
  154. return {key: self.get(key) for key in keys}
  155. else:
  156. return [self.get(k) for k in keys]
  157. def delete(self, key):
  158. self.db.pop(key, None)
  159. class DictBackend(BaseBackend):
  160. def __init__(self, *args, **kwargs):
  161. BaseBackend.__init__(self, *args, **kwargs)
  162. self._data = {'can-delete': {'result': 'foo'}}
  163. def _restore_group(self, group_id):
  164. if group_id == 'exists':
  165. return {'result': 'group'}
  166. def _get_task_meta_for(self, task_id):
  167. if task_id == 'task-exists':
  168. return {'result': 'task'}
  169. def _delete_group(self, group_id):
  170. self._data.pop(group_id, None)
  171. class test_BaseBackend_dict:
  172. def setup(self):
  173. self.b = DictBackend(app=self.app)
  174. @self.app.task(shared=False, bind=True)
  175. def bound_errback(self, result):
  176. pass
  177. @self.app.task(shared=False)
  178. def errback(arg1, arg2):
  179. errback.last_result = arg1 + arg2
  180. self.bound_errback = bound_errback
  181. self.errback = errback
  182. def test_delete_group(self):
  183. self.b.delete_group('can-delete')
  184. assert 'can-delete' not in self.b._data
  185. def test_prepare_exception_json(self):
  186. x = DictBackend(self.app, serializer='json')
  187. e = x.prepare_exception(KeyError('foo'))
  188. assert 'exc_type' in e
  189. e = x.exception_to_python(e)
  190. assert e.__class__.__name__ == 'KeyError'
  191. assert str(e).strip('u') == "'foo'"
  192. def test_save_group(self):
  193. b = BaseBackend(self.app)
  194. b._save_group = Mock()
  195. b.save_group('foofoo', 'xxx')
  196. b._save_group.assert_called_with('foofoo', 'xxx')
  197. def test_add_to_chord_interface(self):
  198. b = BaseBackend(self.app)
  199. with pytest.raises(NotImplementedError):
  200. b.add_to_chord('group_id', 'sig')
  201. def test_forget_interface(self):
  202. b = BaseBackend(self.app)
  203. with pytest.raises(NotImplementedError):
  204. b.forget('foo')
  205. def test_restore_group(self):
  206. assert self.b.restore_group('missing') is None
  207. assert self.b.restore_group('missing') is None
  208. assert self.b.restore_group('exists') == 'group'
  209. assert self.b.restore_group('exists') == 'group'
  210. assert self.b.restore_group('exists', cache=False) == 'group'
  211. def test_reload_group_result(self):
  212. self.b._cache = {}
  213. self.b.reload_group_result('exists')
  214. self.b._cache['exists'] = {'result': 'group'}
  215. def test_reload_task_result(self):
  216. self.b._cache = {}
  217. self.b.reload_task_result('task-exists')
  218. self.b._cache['task-exists'] = {'result': 'task'}
  219. def test_fail_from_current_stack(self):
  220. self.b.mark_as_failure = Mock()
  221. try:
  222. raise KeyError('foo')
  223. except KeyError as exc:
  224. self.b.fail_from_current_stack('task_id')
  225. self.b.mark_as_failure.assert_called()
  226. args = self.b.mark_as_failure.call_args[0]
  227. assert args[0] == 'task_id'
  228. assert args[1] is exc
  229. assert args[2]
  230. def test_prepare_value_serializes_group_result(self):
  231. self.b.serializer = 'json'
  232. g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')])
  233. v = self.b.prepare_value(g)
  234. assert isinstance(v, (list, tuple))
  235. assert result_from_tuple(v, app=self.app) == g
  236. v2 = self.b.prepare_value(g[0])
  237. assert isinstance(v2, (list, tuple))
  238. assert result_from_tuple(v2, app=self.app) == g[0]
  239. self.b.serializer = 'pickle'
  240. assert isinstance(self.b.prepare_value(g), self.app.GroupResult)
  241. def test_is_cached(self):
  242. b = BaseBackend(app=self.app, max_cached_results=1)
  243. b._cache['foo'] = 1
  244. assert b.is_cached('foo')
  245. assert not b.is_cached('false')
  246. def test_mark_as_done__chord(self):
  247. b = BaseBackend(app=self.app)
  248. b._store_result = Mock()
  249. request = Mock(name='request')
  250. b.on_chord_part_return = Mock()
  251. b.mark_as_done('id', 10, request=request)
  252. b.on_chord_part_return.assert_called_with(request, states.SUCCESS, 10)
  253. def test_mark_as_failure__bound_errback(self):
  254. b = BaseBackend(app=self.app)
  255. b._store_result = Mock()
  256. request = Mock(name='request')
  257. request.errbacks = [
  258. self.bound_errback.subtask(args=[1], immutable=True)]
  259. exc = KeyError()
  260. group = self.patching('celery.backends.base.group')
  261. b.mark_as_failure('id', exc, request=request)
  262. group.assert_called_with(request.errbacks, app=self.app)
  263. group.return_value.apply_async.assert_called_with(
  264. (request.id, ), parent_id=request.id, root_id=request.root_id)
  265. def test_mark_as_failure__errback(self):
  266. b = BaseBackend(app=self.app)
  267. b._store_result = Mock()
  268. request = Mock(name='request')
  269. request.errbacks = [self.errback.subtask(args=[2, 3], immutable=True)]
  270. exc = KeyError()
  271. b.mark_as_failure('id', exc, request=request)
  272. assert self.errback.last_result == 5
  273. def test_mark_as_failure__chord(self):
  274. b = BaseBackend(app=self.app)
  275. b._store_result = Mock()
  276. request = Mock(name='request')
  277. request.errbacks = []
  278. b.on_chord_part_return = Mock()
  279. exc = KeyError()
  280. b.mark_as_failure('id', exc, request=request)
  281. b.on_chord_part_return.assert_called_with(request, states.FAILURE, exc)
  282. def test_mark_as_revoked__chord(self):
  283. b = BaseBackend(app=self.app)
  284. b._store_result = Mock()
  285. request = Mock(name='request')
  286. request.errbacks = []
  287. b.on_chord_part_return = Mock()
  288. b.mark_as_revoked('id', 'revoked', request=request)
  289. b.on_chord_part_return.assert_called_with(request, states.REVOKED, ANY)
  290. def test_chord_error_from_stack_raises(self):
  291. b = BaseBackend(app=self.app)
  292. exc = KeyError()
  293. callback = Mock(name='callback')
  294. callback.options = {'link_error': []}
  295. task = self.app.tasks[callback.task] = Mock()
  296. b.fail_from_current_stack = Mock()
  297. group = self.patching('celery.group')
  298. group.side_effect = exc
  299. b.chord_error_from_stack(callback, exc=ValueError())
  300. task.backend.fail_from_current_stack.assert_called_with(
  301. callback.id, exc=exc)
  302. def test_exception_to_python_when_None(self):
  303. b = BaseBackend(app=self.app)
  304. assert b.exception_to_python(None) is None
  305. def test_wait_for__on_interval(self):
  306. self.patching('time.sleep')
  307. b = BaseBackend(app=self.app)
  308. b._get_task_meta_for = Mock()
  309. b._get_task_meta_for.return_value = {'status': states.PENDING}
  310. callback = Mock(name='callback')
  311. with pytest.raises(TimeoutError):
  312. b.wait_for(task_id='1', on_interval=callback, timeout=1)
  313. callback.assert_called_with()
  314. b._get_task_meta_for.return_value = {'status': states.SUCCESS}
  315. b.wait_for(task_id='1', timeout=None)
  316. def test_get_children(self):
  317. b = BaseBackend(app=self.app)
  318. b._get_task_meta_for = Mock()
  319. b._get_task_meta_for.return_value = {}
  320. assert b.get_children('id') is None
  321. b._get_task_meta_for.return_value = {'children': 3}
  322. assert b.get_children('id') == 3
  323. class test_KeyValueStoreBackend:
  324. def setup(self):
  325. self.b = KVBackend(app=self.app)
  326. def test_on_chord_part_return(self):
  327. assert not self.b.implements_incr
  328. self.b.on_chord_part_return(None, None, None)
  329. def test_get_store_delete_result(self):
  330. tid = uuid()
  331. self.b.mark_as_done(tid, 'Hello world')
  332. assert self.b.get_result(tid) == 'Hello world'
  333. assert self.b.get_state(tid) == states.SUCCESS
  334. self.b.forget(tid)
  335. assert self.b.get_state(tid) == states.PENDING
  336. def test_store_result_group_id(self):
  337. tid = uuid()
  338. state = 'SUCCESS'
  339. result = 10
  340. request = Context(group='gid', children=[])
  341. self.b.store_result(
  342. tid, state=state, result=result, request=request,
  343. )
  344. stored_meta = self.b.decode(self.b.get(self.b.get_key_for_task(tid)))
  345. assert stored_meta['group_id'] == request.group
  346. def test_strip_prefix(self):
  347. x = self.b.get_key_for_task('x1b34')
  348. assert self.b._strip_prefix(x) == 'x1b34'
  349. assert self.b._strip_prefix('x1b34') == 'x1b34'
  350. def test_get_many(self):
  351. for is_dict in True, False:
  352. self.b.mget_returns_dict = is_dict
  353. ids = {uuid(): i for i in range(10)}
  354. for id, i in items(ids):
  355. self.b.mark_as_done(id, i)
  356. it = self.b.get_many(list(ids), interval=0.01)
  357. for i, (got_id, got_state) in enumerate(it):
  358. assert got_state['result'] == ids[got_id]
  359. assert i == 9
  360. assert list(self.b.get_many(list(ids), interval=0.01))
  361. self.b._cache.clear()
  362. callback = Mock(name='callback')
  363. it = self.b.get_many(
  364. list(ids),
  365. on_message=callback,
  366. interval=0.05
  367. )
  368. for i, (got_id, got_state) in enumerate(it):
  369. assert got_state['result'] == ids[got_id]
  370. assert i == 9
  371. assert list(
  372. self.b.get_many(list(ids), interval=0.01)
  373. )
  374. callback.assert_has_calls([
  375. call(ANY) for id in ids
  376. ])
  377. def test_get_many_times_out(self):
  378. tasks = [uuid() for _ in range(4)]
  379. self.b._cache[tasks[1]] = {'status': 'PENDING'}
  380. with pytest.raises(self.b.TimeoutError):
  381. list(self.b.get_many(tasks, timeout=0.01, interval=0.01))
  382. def test_chord_part_return_no_gid(self):
  383. self.b.implements_incr = True
  384. task = Mock()
  385. state = 'SUCCESS'
  386. result = 10
  387. task.request.group = None
  388. self.b.get_key_for_chord = Mock()
  389. self.b.get_key_for_chord.side_effect = AssertionError(
  390. 'should not get here',
  391. )
  392. assert self.b.on_chord_part_return(
  393. task.request, state, result) is None
  394. @patch('celery.backends.base.GroupResult')
  395. @patch('celery.backends.base.maybe_signature')
  396. def test_chord_part_return_restore_raises(self, maybe_signature,
  397. GroupResult):
  398. self.b.implements_incr = True
  399. GroupResult.restore.side_effect = KeyError()
  400. self.b.chord_error_from_stack = Mock()
  401. callback = Mock(name='callback')
  402. request = Mock(name='request')
  403. request.group = 'gid'
  404. maybe_signature.return_value = callback
  405. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  406. self.b.chord_error_from_stack.assert_called_with(
  407. callback, ANY,
  408. )
  409. @patch('celery.backends.base.GroupResult')
  410. @patch('celery.backends.base.maybe_signature')
  411. def test_chord_part_return_restore_empty(self, maybe_signature,
  412. GroupResult):
  413. self.b.implements_incr = True
  414. GroupResult.restore.return_value = None
  415. self.b.chord_error_from_stack = Mock()
  416. callback = Mock(name='callback')
  417. request = Mock(name='request')
  418. request.group = 'gid'
  419. maybe_signature.return_value = callback
  420. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  421. self.b.chord_error_from_stack.assert_called_with(
  422. callback, ANY,
  423. )
  424. def test_filter_ready(self):
  425. self.b.decode_result = Mock()
  426. self.b.decode_result.side_effect = pass1
  427. assert len(list(self.b._filter_ready([
  428. (1, {'status': states.RETRY}),
  429. (2, {'status': states.FAILURE}),
  430. (3, {'status': states.SUCCESS}),
  431. ]))) == 2
  432. @contextmanager
  433. def _chord_part_context(self, b):
  434. @self.app.task(shared=False)
  435. def callback(result):
  436. pass
  437. b.implements_incr = True
  438. b.client = Mock()
  439. with patch('celery.backends.base.GroupResult') as GR:
  440. deps = GR.restore.return_value = Mock(name='DEPS')
  441. deps.__len__ = Mock()
  442. deps.__len__.return_value = 10
  443. b.incr = Mock()
  444. b.incr.return_value = 10
  445. b.expire = Mock()
  446. task = Mock()
  447. task.request.group = 'grid'
  448. cb = task.request.chord = callback.s()
  449. task.request.chord.freeze()
  450. callback.backend = b
  451. callback.backend.fail_from_current_stack = Mock()
  452. yield task, deps, cb
  453. def test_chord_part_return_propagate_set(self):
  454. with self._chord_part_context(self.b) as (task, deps, _):
  455. self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
  456. self.b.expire.assert_not_called()
  457. deps.delete.assert_called_with()
  458. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  459. def test_chord_part_return_propagate_default(self):
  460. with self._chord_part_context(self.b) as (task, deps, _):
  461. self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
  462. self.b.expire.assert_not_called()
  463. deps.delete.assert_called_with()
  464. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  465. def test_chord_part_return_join_raises_internal(self):
  466. with self._chord_part_context(self.b) as (task, deps, callback):
  467. deps._failed_join_report = lambda: iter([])
  468. deps.join_native.side_effect = KeyError('foo')
  469. self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
  470. self.b.fail_from_current_stack.assert_called()
  471. args = self.b.fail_from_current_stack.call_args
  472. exc = args[1]['exc']
  473. assert isinstance(exc, ChordError)
  474. assert 'foo' in str(exc)
  475. def test_chord_part_return_join_raises_task(self):
  476. b = KVBackend(serializer='pickle', app=self.app)
  477. with self._chord_part_context(b) as (task, deps, callback):
  478. deps._failed_join_report = lambda: iter([
  479. self.app.AsyncResult('culprit'),
  480. ])
  481. deps.join_native.side_effect = KeyError('foo')
  482. b.on_chord_part_return(task.request, 'SUCCESS', 10)
  483. b.fail_from_current_stack.assert_called()
  484. args = b.fail_from_current_stack.call_args
  485. exc = args[1]['exc']
  486. assert isinstance(exc, ChordError)
  487. assert 'Dependency culprit raised' in str(exc)
  488. def test_restore_group_from_json(self):
  489. b = KVBackend(serializer='json', app=self.app)
  490. g = self.app.GroupResult(
  491. 'group_id',
  492. [self.app.AsyncResult('a'), self.app.AsyncResult('b')],
  493. )
  494. b._save_group(g.id, g)
  495. g2 = b._restore_group(g.id)['result']
  496. assert g2 == g
  497. def test_restore_group_from_pickle(self):
  498. b = KVBackend(serializer='pickle', app=self.app)
  499. g = self.app.GroupResult(
  500. 'group_id',
  501. [self.app.AsyncResult('a'), self.app.AsyncResult('b')],
  502. )
  503. b._save_group(g.id, g)
  504. g2 = b._restore_group(g.id)['result']
  505. assert g2 == g
  506. def test_chord_apply_fallback(self):
  507. self.b.implements_incr = False
  508. self.b.fallback_chord_unlock = Mock()
  509. header_result = self.app.GroupResult(
  510. 'group_id',
  511. [self.app.AsyncResult(x) for x in range(3)],
  512. )
  513. self.b.apply_chord(
  514. header_result, 'body', foo=1,
  515. )
  516. self.b.fallback_chord_unlock.assert_called_with(
  517. header_result, 'body', foo=1,
  518. )
  519. def test_get_missing_meta(self):
  520. assert self.b.get_result('xxx-missing') is None
  521. assert self.b.get_state('xxx-missing') == states.PENDING
  522. def test_save_restore_delete_group(self):
  523. tid = uuid()
  524. tsr = self.app.GroupResult(
  525. tid, [self.app.AsyncResult(uuid()) for _ in range(10)],
  526. )
  527. self.b.save_group(tid, tsr)
  528. self.b.restore_group(tid)
  529. assert self.b.restore_group(tid) == tsr
  530. self.b.delete_group(tid)
  531. assert self.b.restore_group(tid) is None
  532. def test_restore_missing_group(self):
  533. assert self.b.restore_group('xxx-nonexistant') is None
  534. class test_KeyValueStoreBackend_interface:
  535. def test_get(self):
  536. with pytest.raises(NotImplementedError):
  537. KeyValueStoreBackend(self.app).get('a')
  538. def test_set(self):
  539. with pytest.raises(NotImplementedError):
  540. KeyValueStoreBackend(self.app).set('a', 1)
  541. def test_incr(self):
  542. with pytest.raises(NotImplementedError):
  543. KeyValueStoreBackend(self.app).incr('a')
  544. def test_cleanup(self):
  545. assert not KeyValueStoreBackend(self.app).cleanup()
  546. def test_delete(self):
  547. with pytest.raises(NotImplementedError):
  548. KeyValueStoreBackend(self.app).delete('a')
  549. def test_mget(self):
  550. with pytest.raises(NotImplementedError):
  551. KeyValueStoreBackend(self.app).mget(['a'])
  552. def test_forget(self):
  553. with pytest.raises(NotImplementedError):
  554. KeyValueStoreBackend(self.app).forget('a')
  555. class test_DisabledBackend:
  556. def test_store_result(self):
  557. DisabledBackend(self.app).store_result()
  558. def test_is_disabled(self):
  559. with pytest.raises(NotImplementedError):
  560. DisabledBackend(self.app).get_state('foo')
  561. def test_as_uri(self):
  562. assert DisabledBackend(self.app).as_uri() == 'disabled://'
  563. @pytest.mark.celery(result_backend='disabled')
  564. def test_chord_raises_error(self):
  565. with pytest.raises(NotImplementedError):
  566. chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))
  567. @pytest.mark.celery(result_backend='disabled')
  568. def test_chain_with_chord_raises_error(self):
  569. with pytest.raises(NotImplementedError):
  570. (self.add.s(2, 2) |
  571. group(self.add.s(2, 2),
  572. self.add.s(5, 6)) | self.add.s()).delay()
  573. class test_as_uri:
  574. def setup(self):
  575. self.b = BaseBackend(
  576. app=self.app,
  577. url='sch://uuuu:pwpw@hostname.dom'
  578. )
  579. def test_as_uri_include_password(self):
  580. assert self.b.as_uri(True) == self.b.url
  581. def test_as_uri_exclude_password(self):
  582. assert self.b.as_uri() == 'sch://uuuu:**@hostname.dom/'