test_result.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041
  1. from __future__ import absolute_import, unicode_literals
  2. import copy
  3. import traceback
  4. from contextlib import contextmanager
  5. import pytest
  6. from case import Mock, call, patch, skip
  7. from celery import states, uuid
  8. from celery.app.task import Context
  9. from celery.backends.base import SyncBackendMixin
  10. from celery.exceptions import (CPendingDeprecationWarning,
  11. ImproperlyConfigured, IncompleteStream,
  12. TimeoutError)
  13. from celery.five import range
  14. from celery.result import (AsyncResult, EagerResult, GroupResult, ResultSet,
  15. assert_will_not_block, result_from_tuple)
  16. from celery.utils.serialization import pickle
  17. PYTRACEBACK = """\
  18. Traceback (most recent call last):
  19. File "foo.py", line 2, in foofunc
  20. don't matter
  21. File "bar.py", line 3, in barfunc
  22. don't matter
  23. Doesn't matter: really!\
  24. """
  25. def mock_task(name, state, result, traceback=None):
  26. return {
  27. 'id': uuid(), 'name': name, 'state': state,
  28. 'result': result, 'traceback': traceback,
  29. }
  30. def save_result(app, task):
  31. traceback = task.get('traceback') or 'Some traceback'
  32. if task['state'] == states.SUCCESS:
  33. app.backend.mark_as_done(task['id'], task['result'])
  34. elif task['state'] == states.RETRY:
  35. app.backend.mark_as_retry(
  36. task['id'], task['result'], traceback=traceback,
  37. )
  38. else:
  39. app.backend.mark_as_failure(
  40. task['id'], task['result'], traceback=traceback,
  41. )
  42. def make_mock_group(app, size=10):
  43. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  44. [save_result(app, task) for task in tasks]
  45. return [app.AsyncResult(task['id']) for task in tasks]
  46. class _MockBackend:
  47. def add_pending_result(self, *args, **kwargs):
  48. return True
  49. def wait_for_pending(self, *args, **kwargs):
  50. return True
  51. class test_AsyncResult:
  52. def setup(self):
  53. self.app.conf.result_cache_max = 100
  54. self.app.conf.result_serializer = 'pickle'
  55. self.app.conf.result_extended = True
  56. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  57. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  58. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  59. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  60. self.task5 = mock_task(
  61. 'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK,
  62. )
  63. for task in (self.task1, self.task2,
  64. self.task3, self.task4, self.task5):
  65. save_result(self.app, task)
  66. @self.app.task(shared=False)
  67. def mytask():
  68. pass
  69. self.mytask = mytask
  70. def test_ignored_getter(self):
  71. result = self.app.AsyncResult(uuid())
  72. assert result.ignored is False
  73. result.__delattr__('_ignored')
  74. assert result.ignored is False
  75. @patch('celery.result.task_join_will_block')
  76. def test_assert_will_not_block(self, task_join_will_block):
  77. task_join_will_block.return_value = True
  78. with pytest.raises(RuntimeError):
  79. assert_will_not_block()
  80. task_join_will_block.return_value = False
  81. assert_will_not_block()
  82. @patch('celery.result.task_join_will_block')
  83. def test_get_sync_subtask_option(self, task_join_will_block):
  84. task_join_will_block.return_value = True
  85. tid = uuid()
  86. backend = _MockBackend()
  87. res_subtask_async = AsyncResult(tid, backend=backend)
  88. with pytest.raises(RuntimeError):
  89. res_subtask_async.get()
  90. res_subtask_async.get(disable_sync_subtasks=False)
  91. def test_without_id(self):
  92. with pytest.raises(ValueError):
  93. AsyncResult(None, app=self.app)
  94. def test_compat_properties(self):
  95. x = self.app.AsyncResult('1')
  96. assert x.task_id == x.id
  97. x.task_id = '2'
  98. assert x.id == '2'
  99. @pytest.mark.usefixtures('depends_on_current_app')
  100. def test_reduce_direct(self):
  101. x = AsyncResult('1', app=self.app)
  102. fun, args = x.__reduce__()
  103. assert fun(*args) == x
  104. def test_children(self):
  105. x = self.app.AsyncResult('1')
  106. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  107. x._cache = {'children': children, 'status': states.SUCCESS}
  108. x.backend = Mock()
  109. assert x.children
  110. assert len(x.children) == 3
  111. def test_propagates_for_parent(self):
  112. x = self.app.AsyncResult(uuid())
  113. x.backend = Mock(name='backend')
  114. x.backend.get_task_meta.return_value = {}
  115. x.backend.wait_for_pending.return_value = 84
  116. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  117. with pytest.raises(KeyError):
  118. x.get(propagate=True)
  119. x.backend.wait_for_pending.assert_not_called()
  120. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  121. assert x.get(propagate=True) == 84
  122. x.backend.wait_for_pending.assert_called()
  123. def test_get_children(self):
  124. tid = uuid()
  125. x = self.app.AsyncResult(tid)
  126. child = [self.app.AsyncResult(uuid()).as_tuple()
  127. for i in range(10)]
  128. x._cache = {'children': child}
  129. assert x.children
  130. assert len(x.children) == 10
  131. x._cache = {'status': states.SUCCESS}
  132. x.backend._cache[tid] = {'result': None}
  133. assert x.children is None
  134. def test_build_graph_get_leaf_collect(self):
  135. x = self.app.AsyncResult('1')
  136. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  137. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  138. x.iterdeps = Mock()
  139. x.iterdeps.return_value = (
  140. (None, x),
  141. (x, c[0]),
  142. (c[0], c[1]),
  143. (c[1], c[2])
  144. )
  145. x.backend.READY_STATES = states.READY_STATES
  146. assert x.graph
  147. assert x.get_leaf() is 2
  148. it = x.collect()
  149. assert list(it) == [
  150. (x, None),
  151. (c[0], 0),
  152. (c[1], 1),
  153. (c[2], 2),
  154. ]
  155. def test_iterdeps(self):
  156. x = self.app.AsyncResult('1')
  157. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  158. x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
  159. for child in c:
  160. child.backend = Mock()
  161. child.backend.get_children.return_value = []
  162. it = x.iterdeps()
  163. assert list(it) == [
  164. (None, x),
  165. (x, c[0]),
  166. (x, c[1]),
  167. (x, c[2]),
  168. ]
  169. x._cache = None
  170. x.ready = Mock()
  171. x.ready.return_value = False
  172. with pytest.raises(IncompleteStream):
  173. list(x.iterdeps())
  174. list(x.iterdeps(intermediate=True))
  175. def test_eq_not_implemented(self):
  176. assert self.app.AsyncResult('1') != object()
  177. @pytest.mark.usefixtures('depends_on_current_app')
  178. def test_reduce(self):
  179. a1 = self.app.AsyncResult('uuid')
  180. restored = pickle.loads(pickle.dumps(a1))
  181. assert restored.id == 'uuid'
  182. a2 = self.app.AsyncResult('uuid')
  183. assert pickle.loads(pickle.dumps(a2)).id == 'uuid'
  184. def test_maybe_set_cache_empty(self):
  185. self.app.AsyncResult('uuid')._maybe_set_cache(None)
  186. def test_set_cache__children(self):
  187. r1 = self.app.AsyncResult('id1')
  188. r2 = self.app.AsyncResult('id2')
  189. r1._set_cache({'children': [r2.as_tuple()]})
  190. assert r2 in r1.children
  191. def test_successful(self):
  192. ok_res = self.app.AsyncResult(self.task1['id'])
  193. nok_res = self.app.AsyncResult(self.task3['id'])
  194. nok_res2 = self.app.AsyncResult(self.task4['id'])
  195. assert ok_res.successful()
  196. assert not nok_res.successful()
  197. assert not nok_res2.successful()
  198. pending_res = self.app.AsyncResult(uuid())
  199. assert not pending_res.successful()
  200. def test_raising(self):
  201. notb = self.app.AsyncResult(self.task3['id'])
  202. withtb = self.app.AsyncResult(self.task5['id'])
  203. with pytest.raises(KeyError):
  204. notb.get()
  205. try:
  206. withtb.get()
  207. except KeyError:
  208. tb = traceback.format_exc()
  209. assert ' File "foo.py", line 2, in foofunc' not in tb
  210. assert ' File "bar.py", line 3, in barfunc' not in tb
  211. assert 'KeyError:' in tb
  212. assert "'blue'" in tb
  213. else:
  214. raise AssertionError('Did not raise KeyError.')
  215. @skip.unless_module('tblib')
  216. def test_raising_remote_tracebacks(self):
  217. withtb = self.app.AsyncResult(self.task5['id'])
  218. self.app.conf.task_remote_tracebacks = True
  219. try:
  220. withtb.get()
  221. except KeyError:
  222. tb = traceback.format_exc()
  223. assert ' File "foo.py", line 2, in foofunc' in tb
  224. assert ' File "bar.py", line 3, in barfunc' in tb
  225. assert 'KeyError:' in tb
  226. assert "'blue'" in tb
  227. else:
  228. raise AssertionError('Did not raise KeyError.')
  229. def test_str(self):
  230. ok_res = self.app.AsyncResult(self.task1['id'])
  231. ok2_res = self.app.AsyncResult(self.task2['id'])
  232. nok_res = self.app.AsyncResult(self.task3['id'])
  233. assert str(ok_res) == self.task1['id']
  234. assert str(ok2_res) == self.task2['id']
  235. assert str(nok_res) == self.task3['id']
  236. pending_id = uuid()
  237. pending_res = self.app.AsyncResult(pending_id)
  238. assert str(pending_res) == pending_id
  239. def test_repr(self):
  240. ok_res = self.app.AsyncResult(self.task1['id'])
  241. ok2_res = self.app.AsyncResult(self.task2['id'])
  242. nok_res = self.app.AsyncResult(self.task3['id'])
  243. assert repr(ok_res) == '<AsyncResult: %s>' % (self.task1['id'],)
  244. assert repr(ok2_res) == '<AsyncResult: %s>' % (self.task2['id'],)
  245. assert repr(nok_res) == '<AsyncResult: %s>' % (self.task3['id'],)
  246. pending_id = uuid()
  247. pending_res = self.app.AsyncResult(pending_id)
  248. assert repr(pending_res) == '<AsyncResult: %s>' % (pending_id,)
  249. def test_hash(self):
  250. assert (hash(self.app.AsyncResult('x0w991')) ==
  251. hash(self.app.AsyncResult('x0w991')))
  252. assert (hash(self.app.AsyncResult('x0w991')) !=
  253. hash(self.app.AsyncResult('x1w991')))
  254. def test_get_traceback(self):
  255. ok_res = self.app.AsyncResult(self.task1['id'])
  256. nok_res = self.app.AsyncResult(self.task3['id'])
  257. nok_res2 = self.app.AsyncResult(self.task4['id'])
  258. assert not ok_res.traceback
  259. assert nok_res.traceback
  260. assert nok_res2.traceback
  261. pending_res = self.app.AsyncResult(uuid())
  262. assert not pending_res.traceback
  263. def test_get__backend_gives_None(self):
  264. res = self.app.AsyncResult(self.task1['id'])
  265. res.backend.wait_for = Mock(name='wait_for')
  266. res.backend.wait_for.return_value = None
  267. assert res.get() is None
  268. def test_get(self):
  269. ok_res = self.app.AsyncResult(self.task1['id'])
  270. ok2_res = self.app.AsyncResult(self.task2['id'])
  271. nok_res = self.app.AsyncResult(self.task3['id'])
  272. nok2_res = self.app.AsyncResult(self.task4['id'])
  273. callback = Mock(name='callback')
  274. assert ok_res.get(callback=callback) == 'the'
  275. callback.assert_called_with(ok_res.id, 'the')
  276. assert ok2_res.get() == 'quick'
  277. with pytest.raises(KeyError):
  278. nok_res.get()
  279. assert nok_res.get(propagate=False)
  280. assert isinstance(nok2_res.result, KeyError)
  281. assert ok_res.info == 'the'
  282. def test_get_when_ignored(self):
  283. result = self.app.AsyncResult(uuid())
  284. result.ignored = True
  285. # Does not block
  286. assert result.get() is None
  287. def test_eq_ne(self):
  288. r1 = self.app.AsyncResult(self.task1['id'])
  289. r2 = self.app.AsyncResult(self.task1['id'])
  290. r3 = self.app.AsyncResult(self.task2['id'])
  291. assert r1 == r2
  292. assert r1 != r3
  293. assert r1 == r2.id
  294. assert r1 != r3.id
  295. @pytest.mark.usefixtures('depends_on_current_app')
  296. def test_reduce_restore(self):
  297. r1 = self.app.AsyncResult(self.task1['id'])
  298. fun, args = r1.__reduce__()
  299. assert fun(*args) == r1
  300. def test_get_timeout(self):
  301. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  302. with pytest.raises(TimeoutError):
  303. res.get(timeout=0.001)
  304. pending_res = self.app.AsyncResult(uuid())
  305. with patch('celery.result.time') as _time:
  306. with pytest.raises(TimeoutError):
  307. pending_res.get(timeout=0.001, interval=0.001)
  308. _time.sleep.assert_called_with(0.001)
  309. def test_get_timeout_longer(self):
  310. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  311. with patch('celery.result.time') as _time:
  312. with pytest.raises(TimeoutError):
  313. res.get(timeout=1, interval=1)
  314. _time.sleep.assert_called_with(1)
  315. def test_ready(self):
  316. oks = (self.app.AsyncResult(self.task1['id']),
  317. self.app.AsyncResult(self.task2['id']),
  318. self.app.AsyncResult(self.task3['id']))
  319. assert all(result.ready() for result in oks)
  320. assert not self.app.AsyncResult(self.task4['id']).ready()
  321. assert not self.app.AsyncResult(uuid()).ready()
  322. def test_del(self):
  323. with patch('celery.result.AsyncResult.backend') as backend:
  324. result = self.app.AsyncResult(self.task1['id'])
  325. result_clone = copy.copy(result)
  326. del result
  327. assert backend.remove_pending_result.called_once_with(
  328. result_clone
  329. )
  330. result = self.app.AsyncResult(self.task1['id'])
  331. result.backend = None
  332. del result
  333. def test_get_request_meta(self):
  334. x = self.app.AsyncResult('1')
  335. request = Context(
  336. task_name='foo',
  337. children=None,
  338. args=['one', 'two'],
  339. kwargs={'kwarg1': 'three'},
  340. hostname="foo",
  341. retries=1,
  342. delivery_info={'routing_key': 'celery'}
  343. )
  344. x.backend.store_result(task_id="1", result='foo', state=states.SUCCESS,
  345. traceback=None, request=request)
  346. assert x.name == 'foo'
  347. assert x.args == ['one', 'two']
  348. assert x.kwargs == {'kwarg1': 'three'}
  349. assert x.worker == 'foo'
  350. assert x.retries == 1
  351. assert x.queue == 'celery'
  352. assert x.date_done is not None
  353. assert x.task_id == "1"
  354. assert x.state == "SUCCESS"
  355. class test_ResultSet:
  356. def test_resultset_repr(self):
  357. assert repr(self.app.ResultSet(
  358. [self.app.AsyncResult(t) for t in ['1', '2', '3']]))
  359. def test_eq_other(self):
  360. assert self.app.ResultSet([
  361. self.app.AsyncResult(t) for t in [1, 3, 3]]) != 1
  362. rs1 = self.app.ResultSet([self.app.AsyncResult(1)])
  363. rs2 = self.app.ResultSet([self.app.AsyncResult(1)])
  364. assert rs1 == rs2
  365. def test_get(self):
  366. x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  367. b = x.results[0].backend = Mock()
  368. b.supports_native_join = False
  369. x.join_native = Mock()
  370. x.join = Mock()
  371. x.get()
  372. x.join.assert_called()
  373. b.supports_native_join = True
  374. x.get()
  375. x.join_native.assert_called()
  376. def test_eq_ne(self):
  377. g1 = self.app.ResultSet([
  378. self.app.AsyncResult('id1'),
  379. self.app.AsyncResult('id2'),
  380. ])
  381. g2 = self.app.ResultSet([
  382. self.app.AsyncResult('id1'),
  383. self.app.AsyncResult('id2'),
  384. ])
  385. g3 = self.app.ResultSet([
  386. self.app.AsyncResult('id3'),
  387. self.app.AsyncResult('id1'),
  388. ])
  389. assert g1 == g2
  390. assert g1 != g3
  391. assert g1 != object()
  392. def test_takes_app_from_first_task(self):
  393. x = ResultSet([self.app.AsyncResult('id1')])
  394. assert x.app is x.results[0].app
  395. x.app = self.app
  396. assert x.app is self.app
  397. def test_get_empty(self):
  398. x = self.app.ResultSet([])
  399. assert x.supports_native_join is None
  400. x.join = Mock(name='join')
  401. x.get()
  402. x.join.assert_called()
  403. def test_add(self):
  404. x = self.app.ResultSet([self.app.AsyncResult(1)])
  405. x.add(self.app.AsyncResult(2))
  406. assert len(x) == 2
  407. x.add(self.app.AsyncResult(2))
  408. assert len(x) == 2
  409. @contextmanager
  410. def dummy_copy(self):
  411. with patch('celery.result.copy') as copy:
  412. def passt(arg):
  413. return arg
  414. copy.side_effect = passt
  415. yield
  416. def test_iterate_respects_subpolling_interval(self):
  417. r1 = self.app.AsyncResult(uuid())
  418. r2 = self.app.AsyncResult(uuid())
  419. backend = r1.backend = r2.backend = Mock()
  420. backend.subpolling_interval = 10
  421. ready = r1.ready = r2.ready = Mock()
  422. def se(*args, **kwargs):
  423. ready.side_effect = KeyError()
  424. return False
  425. ready.return_value = False
  426. ready.side_effect = se
  427. x = self.app.ResultSet([r1, r2])
  428. with self.dummy_copy():
  429. with patch('celery.result.time') as _time:
  430. with pytest.warns(CPendingDeprecationWarning):
  431. with pytest.raises(KeyError):
  432. list(x.iterate())
  433. _time.sleep.assert_called_with(10)
  434. backend.subpolling_interval = 0
  435. with patch('celery.result.time') as _time:
  436. with pytest.warns(CPendingDeprecationWarning):
  437. with pytest.raises(KeyError):
  438. ready.return_value = False
  439. ready.side_effect = se
  440. list(x.iterate())
  441. _time.sleep.assert_not_called()
  442. def test_times_out(self):
  443. r1 = self.app.AsyncResult(uuid)
  444. r1.ready = Mock()
  445. r1.ready.return_value = False
  446. x = self.app.ResultSet([r1])
  447. with self.dummy_copy():
  448. with patch('celery.result.time'):
  449. with pytest.warns(CPendingDeprecationWarning):
  450. with pytest.raises(TimeoutError):
  451. list(x.iterate(timeout=1))
  452. def test_add_discard(self):
  453. x = self.app.ResultSet([])
  454. x.add(self.app.AsyncResult('1'))
  455. assert self.app.AsyncResult('1') in x.results
  456. x.discard(self.app.AsyncResult('1'))
  457. x.discard(self.app.AsyncResult('1'))
  458. x.discard('1')
  459. assert self.app.AsyncResult('1') not in x.results
  460. x.update([self.app.AsyncResult('2')])
  461. def test_clear(self):
  462. x = self.app.ResultSet([])
  463. r = x.results
  464. x.clear()
  465. assert x.results is r
  466. class MockAsyncResultFailure(AsyncResult):
  467. @property
  468. def result(self):
  469. return KeyError('baz')
  470. @property
  471. def state(self):
  472. return states.FAILURE
  473. def get(self, propagate=True, **kwargs):
  474. if propagate:
  475. raise self.result
  476. return self.result
  477. class MockAsyncResultSuccess(AsyncResult):
  478. forgotten = False
  479. def __init__(self, *args, **kwargs):
  480. self._result = kwargs.pop('result', 42)
  481. super(MockAsyncResultSuccess, self).__init__(*args, **kwargs)
  482. def forget(self):
  483. self.forgotten = True
  484. @property
  485. def result(self):
  486. return self._result
  487. @property
  488. def state(self):
  489. return states.SUCCESS
  490. def get(self, **kwargs):
  491. return self.result
  492. class SimpleBackend(SyncBackendMixin):
  493. ids = []
  494. def __init__(self, ids=[]):
  495. self.ids = ids
  496. def _ensure_not_eager(self):
  497. pass
  498. def get_many(self, *args, **kwargs):
  499. return ((id, {'result': i, 'status': states.SUCCESS})
  500. for i, id in enumerate(self.ids))
  501. class test_GroupResult:
  502. def setup(self):
  503. self.size = 10
  504. self.ts = self.app.GroupResult(
  505. uuid(), make_mock_group(self.app, self.size),
  506. )
  507. @pytest.mark.usefixtures('depends_on_current_app')
  508. def test_is_pickleable(self):
  509. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  510. assert pickle.loads(pickle.dumps(ts)) == ts
  511. ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  512. assert pickle.loads(pickle.dumps(ts2)) == ts2
  513. @pytest.mark.usefixtures('depends_on_current_app')
  514. def test_reduce(self):
  515. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  516. fun, args = ts.__reduce__()
  517. ts2 = fun(*args)
  518. assert ts2.id == ts.id
  519. assert ts == ts2
  520. def test_eq_ne(self):
  521. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  522. ts2 = self.app.GroupResult(ts.id, ts.results)
  523. ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  524. ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())])
  525. assert ts == ts2
  526. assert ts != ts3
  527. assert ts != ts4
  528. assert ts != object()
  529. def test_len(self):
  530. assert len(self.ts) == self.size
  531. def test_eq_other(self):
  532. assert self.ts != 1
  533. def test_eq_with_parent(self):
  534. # GroupResult instances with different .parent are not equal
  535. grp_res = self.app.GroupResult(
  536. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  537. parent=self.app.AsyncResult(uuid())
  538. )
  539. grp_res_2 = self.app.GroupResult(grp_res.id, grp_res.results)
  540. assert grp_res != grp_res_2
  541. grp_res_2.parent = self.app.AsyncResult(uuid())
  542. assert grp_res != grp_res_2
  543. grp_res_2.parent = grp_res.parent
  544. assert grp_res == grp_res_2
  545. @pytest.mark.usefixtures('depends_on_current_app')
  546. def test_pickleable(self):
  547. assert pickle.loads(pickle.dumps(self.ts))
  548. def test_iterate_raises(self):
  549. ar = MockAsyncResultFailure(uuid(), app=self.app)
  550. ts = self.app.GroupResult(uuid(), [ar])
  551. with pytest.warns(CPendingDeprecationWarning):
  552. it = ts.iterate()
  553. with pytest.raises(KeyError):
  554. next(it)
  555. def test_forget(self):
  556. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  557. MockAsyncResultSuccess(uuid(), app=self.app)]
  558. ts = self.app.GroupResult(uuid(), subs)
  559. ts.forget()
  560. for sub in subs:
  561. assert sub.forgotten
  562. def test_get_nested_without_native_join(self):
  563. backend = SimpleBackend()
  564. backend.supports_native_join = False
  565. ts = self.app.GroupResult(uuid(), [
  566. MockAsyncResultSuccess(uuid(), result='1.1',
  567. app=self.app, backend=backend),
  568. self.app.GroupResult(uuid(), [
  569. MockAsyncResultSuccess(uuid(), result='2.1',
  570. app=self.app, backend=backend),
  571. self.app.GroupResult(uuid(), [
  572. MockAsyncResultSuccess(uuid(), result='3.1',
  573. app=self.app, backend=backend),
  574. MockAsyncResultSuccess(uuid(), result='3.2',
  575. app=self.app, backend=backend),
  576. ]),
  577. ]),
  578. ])
  579. ts.app.backend = backend
  580. vals = ts.get()
  581. assert vals == [
  582. '1.1',
  583. [
  584. '2.1',
  585. [
  586. '3.1',
  587. '3.2',
  588. ]
  589. ],
  590. ]
  591. def test_getitem(self):
  592. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  593. MockAsyncResultSuccess(uuid(), app=self.app)]
  594. ts = self.app.GroupResult(uuid(), subs)
  595. assert ts[0] is subs[0]
  596. def test_save_restore(self):
  597. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  598. MockAsyncResultSuccess(uuid(), app=self.app)]
  599. ts = self.app.GroupResult(uuid(), subs)
  600. ts.save()
  601. with pytest.raises(AttributeError):
  602. ts.save(backend=object())
  603. assert self.app.GroupResult.restore(ts.id).results == ts.results
  604. ts.delete()
  605. assert self.app.GroupResult.restore(ts.id) is None
  606. with pytest.raises(AttributeError):
  607. self.app.GroupResult.restore(ts.id, backend=object())
  608. def test_save_restore_empty(self):
  609. subs = []
  610. ts = self.app.GroupResult(uuid(), subs)
  611. ts.save()
  612. assert isinstance(
  613. self.app.GroupResult.restore(ts.id),
  614. self.app.GroupResult,
  615. )
  616. assert self.app.GroupResult.restore(ts.id).results == ts.results == []
  617. def test_restore_app(self):
  618. subs = [MockAsyncResultSuccess(uuid(), app=self.app)]
  619. ts = self.app.GroupResult(uuid(), subs)
  620. ts.save()
  621. restored = GroupResult.restore(ts.id, app=self.app)
  622. assert restored.id == ts.id
  623. def test_restore_current_app_fallback(self):
  624. subs = [MockAsyncResultSuccess(uuid(), app=self.app)]
  625. ts = self.app.GroupResult(uuid(), subs)
  626. ts.save()
  627. with pytest.raises(RuntimeError,
  628. message="Test depends on current_app"):
  629. GroupResult.restore(ts.id)
  630. def test_join_native(self):
  631. backend = SimpleBackend()
  632. results = [self.app.AsyncResult(uuid(), backend=backend)
  633. for i in range(10)]
  634. ts = self.app.GroupResult(uuid(), results)
  635. ts.app.backend = backend
  636. backend.ids = [result.id for result in results]
  637. res = ts.join_native()
  638. assert res == list(range(10))
  639. callback = Mock(name='callback')
  640. assert not ts.join_native(callback=callback)
  641. callback.assert_has_calls([
  642. call(r.id, i) for i, r in enumerate(ts.results)
  643. ])
  644. def test_join_native_raises(self):
  645. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  646. ts.iter_native = Mock()
  647. ts.iter_native.return_value = iter([
  648. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  649. ])
  650. with pytest.raises(KeyError):
  651. ts.join_native(propagate=True)
  652. def test_failed_join_report(self):
  653. res = Mock()
  654. ts = self.app.GroupResult(uuid(), [res])
  655. res.state = states.FAILURE
  656. res.backend.is_cached.return_value = True
  657. assert next(ts._failed_join_report()) is res
  658. res.backend.is_cached.return_value = False
  659. with pytest.raises(StopIteration):
  660. next(ts._failed_join_report())
  661. def test_repr(self):
  662. assert repr(
  663. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]))
  664. def test_children_is_results(self):
  665. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  666. assert ts.children is ts.results
  667. def test_iter_native(self):
  668. backend = SimpleBackend()
  669. results = [self.app.AsyncResult(uuid(), backend=backend)
  670. for i in range(10)]
  671. ts = self.app.GroupResult(uuid(), results)
  672. ts.app.backend = backend
  673. backend.ids = [result.id for result in results]
  674. assert len(list(ts.iter_native())) == 10
  675. def test_iterate_yields(self):
  676. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  677. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  678. ts = self.app.GroupResult(uuid(), [ar, ar2])
  679. with pytest.warns(CPendingDeprecationWarning):
  680. it = ts.iterate()
  681. assert next(it) == 42
  682. assert next(it) == 42
  683. def test_iterate_eager(self):
  684. ar1 = EagerResult(uuid(), 42, states.SUCCESS)
  685. ar2 = EagerResult(uuid(), 42, states.SUCCESS)
  686. ts = self.app.GroupResult(uuid(), [ar1, ar2])
  687. with pytest.warns(CPendingDeprecationWarning):
  688. it = ts.iterate()
  689. assert next(it) == 42
  690. assert next(it) == 42
  691. def test_join_timeout(self):
  692. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  693. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  694. ar3 = self.app.AsyncResult(uuid())
  695. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  696. with pytest.raises(TimeoutError):
  697. ts.join(timeout=0.0000001)
  698. ar4 = self.app.AsyncResult(uuid())
  699. ar4.get = Mock()
  700. ts2 = self.app.GroupResult(uuid(), [ar4])
  701. assert ts2.join(timeout=0.1)
  702. callback = Mock(name='callback')
  703. assert not ts2.join(timeout=0.1, callback=callback)
  704. callback.assert_called_with(ar4.id, ar4.get())
  705. def test_iter_native_when_empty_group(self):
  706. ts = self.app.GroupResult(uuid(), [])
  707. assert list(ts.iter_native()) == []
  708. def test_iterate_simple(self):
  709. with pytest.warns(CPendingDeprecationWarning):
  710. it = self.ts.iterate()
  711. results = sorted(list(it))
  712. assert results == list(range(self.size))
  713. def test___iter__(self):
  714. assert list(iter(self.ts)) == self.ts.results
  715. def test_join(self):
  716. joined = self.ts.join()
  717. assert joined == list(range(self.size))
  718. def test_successful(self):
  719. assert self.ts.successful()
  720. def test_failed(self):
  721. assert not self.ts.failed()
  722. def test_maybe_throw(self):
  723. self.ts.results = [Mock(name='r1')]
  724. self.ts.maybe_throw()
  725. self.ts.results[0].maybe_throw.assert_called_with(
  726. callback=None, propagate=True,
  727. )
  728. def test_join__on_message(self):
  729. with pytest.raises(ImproperlyConfigured):
  730. self.ts.join(on_message=Mock())
  731. def test_waiting(self):
  732. assert not self.ts.waiting()
  733. def test_ready(self):
  734. assert self.ts.ready()
  735. def test_completed_count(self):
  736. assert self.ts.completed_count() == len(self.ts)
  737. class test_pending_AsyncResult:
  738. def test_result(self, app):
  739. res = app.AsyncResult(uuid())
  740. assert res.result is None
  741. class test_failed_AsyncResult:
  742. def setup(self):
  743. self.size = 11
  744. self.app.conf.result_serializer = 'pickle'
  745. results = make_mock_group(self.app, 10)
  746. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  747. save_result(self.app, failed)
  748. failed_res = self.app.AsyncResult(failed['id'])
  749. self.ts = self.app.GroupResult(uuid(), results + [failed_res])
  750. def test_completed_count(self):
  751. assert self.ts.completed_count() == len(self.ts) - 1
  752. def test_iterate_simple(self):
  753. with pytest.warns(CPendingDeprecationWarning):
  754. it = self.ts.iterate()
  755. def consume():
  756. return list(it)
  757. with pytest.raises(KeyError):
  758. consume()
  759. def test_join(self):
  760. with pytest.raises(KeyError):
  761. self.ts.join()
  762. def test_successful(self):
  763. assert not self.ts.successful()
  764. def test_failed(self):
  765. assert self.ts.failed()
  766. class test_pending_Group:
  767. def setup(self):
  768. self.ts = self.app.GroupResult(
  769. uuid(), [self.app.AsyncResult(uuid()),
  770. self.app.AsyncResult(uuid())])
  771. def test_completed_count(self):
  772. assert self.ts.completed_count() == 0
  773. def test_ready(self):
  774. assert not self.ts.ready()
  775. def test_waiting(self):
  776. assert self.ts.waiting()
  777. def test_join(self):
  778. with pytest.raises(TimeoutError):
  779. self.ts.join(timeout=0.001)
  780. def test_join_longer(self):
  781. with pytest.raises(TimeoutError):
  782. self.ts.join(timeout=1)
  783. class test_EagerResult:
  784. def setup(self):
  785. @self.app.task(shared=False)
  786. def raising(x, y):
  787. raise KeyError(x, y)
  788. self.raising = raising
  789. def test_wait_raises(self):
  790. res = self.raising.apply(args=[3, 3])
  791. with pytest.raises(KeyError):
  792. res.wait()
  793. assert res.wait(propagate=False)
  794. def test_wait(self):
  795. res = EagerResult('x', 'x', states.RETRY)
  796. res.wait()
  797. assert res.state == states.RETRY
  798. assert res.status == states.RETRY
  799. def test_forget(self):
  800. res = EagerResult('x', 'x', states.RETRY)
  801. res.forget()
  802. def test_revoke(self):
  803. res = self.raising.apply(args=[3, 3])
  804. assert not res.revoke()
  805. @patch('celery.result.task_join_will_block')
  806. def test_get_sync_subtask_option(self, task_join_will_block):
  807. task_join_will_block.return_value = True
  808. tid = uuid()
  809. res_subtask_async = EagerResult(tid, 'x', 'x', states.SUCCESS)
  810. with pytest.raises(RuntimeError):
  811. res_subtask_async.get()
  812. res_subtask_async.get(disable_sync_subtasks=False)
  813. class test_tuples:
  814. def test_AsyncResult(self):
  815. x = self.app.AsyncResult(uuid())
  816. assert x, result_from_tuple(x.as_tuple() == self.app)
  817. assert x, result_from_tuple(x == self.app)
  818. def test_with_parent(self):
  819. x = self.app.AsyncResult(uuid())
  820. x.parent = self.app.AsyncResult(uuid())
  821. y = result_from_tuple(x.as_tuple(), self.app)
  822. assert y == x
  823. assert y.parent == x.parent
  824. assert isinstance(y.parent, AsyncResult)
  825. def test_compat(self):
  826. uid = uuid()
  827. x = result_from_tuple([uid, []], app=self.app)
  828. assert x.id == uid
  829. def test_GroupResult(self):
  830. x = self.app.GroupResult(
  831. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  832. )
  833. assert x, result_from_tuple(x.as_tuple() == self.app)
  834. assert x, result_from_tuple(x == self.app)
  835. def test_GroupResult_with_parent(self):
  836. parent = self.app.AsyncResult(uuid())
  837. result = self.app.GroupResult(
  838. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  839. parent
  840. )
  841. second_result = result_from_tuple(result.as_tuple(), self.app)
  842. assert second_result == result
  843. assert second_result.parent == parent
  844. def test_GroupResult_as_tuple(self):
  845. parent = self.app.AsyncResult(uuid())
  846. result = self.app.GroupResult(
  847. 'group-result-1',
  848. [self.app.AsyncResult('async-result-{}'.format(i))
  849. for i in range(2)],
  850. parent
  851. )
  852. (result_id, parent_tuple), group_results = result.as_tuple()
  853. assert result_id == result.id
  854. assert parent_tuple == parent.as_tuple()
  855. assert parent_tuple[0][0] == parent.id
  856. assert isinstance(group_results, list)
  857. expected_grp_res = [(('async-result-{}'.format(i), None), None)
  858. for i in range(2)]
  859. assert group_results == expected_grp_res