test_result.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. import pytest
  2. import traceback
  3. from contextlib import contextmanager
  4. from case import Mock, call, patch, skip
  5. from celery import uuid
  6. from celery import states
  7. from celery.backends.base import SyncBackendMixin
  8. from celery.exceptions import (
  9. ImproperlyConfigured, IncompleteStream, TimeoutError,
  10. )
  11. from celery.result import (
  12. AsyncResult,
  13. EagerResult,
  14. ResultSet,
  15. result_from_tuple,
  16. assert_will_not_block,
  17. )
  18. from celery.utils.serialization import pickle
  19. PYTRACEBACK = """\
  20. Traceback (most recent call last):
  21. File "foo.py", line 2, in foofunc
  22. don't matter
  23. File "bar.py", line 3, in barfunc
  24. don't matter
  25. Doesn't matter: really!\
  26. """
  27. def mock_task(name, state, result, traceback=None):
  28. return dict(
  29. id=uuid(), name=name, state=state,
  30. result=result, traceback=traceback,
  31. )
  32. def save_result(app, task):
  33. traceback = task.get('traceback') or 'Some traceback'
  34. if task['state'] == states.SUCCESS:
  35. app.backend.mark_as_done(task['id'], task['result'])
  36. elif task['state'] == states.RETRY:
  37. app.backend.mark_as_retry(
  38. task['id'], task['result'], traceback=traceback,
  39. )
  40. else:
  41. app.backend.mark_as_failure(
  42. task['id'], task['result'], traceback=traceback,
  43. )
  44. def make_mock_group(app, size=10):
  45. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  46. [save_result(app, task) for task in tasks]
  47. return [app.AsyncResult(task['id']) for task in tasks]
  48. class test_AsyncResult:
  49. def setup(self):
  50. self.app.conf.result_cache_max = 100
  51. self.app.conf.result_serializer = 'pickle'
  52. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  53. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  54. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  55. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  56. self.task5 = mock_task(
  57. 'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK,
  58. )
  59. for task in (self.task1, self.task2,
  60. self.task3, self.task4, self.task5):
  61. save_result(self.app, task)
  62. @self.app.task(shared=False)
  63. def mytask():
  64. pass
  65. self.mytask = mytask
  66. @patch('celery.result.task_join_will_block')
  67. def test_assert_will_not_block(self, task_join_will_block):
  68. task_join_will_block.return_value = True
  69. with pytest.raises(RuntimeError):
  70. assert_will_not_block()
  71. task_join_will_block.return_value = False
  72. assert_will_not_block()
  73. def test_without_id(self):
  74. with pytest.raises(ValueError):
  75. AsyncResult(None, app=self.app)
  76. @pytest.mark.usefixtures('depends_on_current_app')
  77. def test_reduce_direct(self):
  78. x = AsyncResult('1', app=self.app)
  79. fun, args = x.__reduce__()
  80. assert fun(*args) == x
  81. def test_children(self):
  82. x = self.app.AsyncResult('1')
  83. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  84. x._cache = {'children': children, 'status': states.SUCCESS}
  85. x.backend = Mock()
  86. assert x.children
  87. assert len(x.children) == 3
  88. def test_propagates_for_parent(self):
  89. x = self.app.AsyncResult(uuid())
  90. x.backend = Mock(name='backend')
  91. x.backend.get_task_meta.return_value = {}
  92. x.backend.wait_for_pending.return_value = 84
  93. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  94. with pytest.raises(KeyError):
  95. x.get(propagate=True)
  96. x.backend.wait_for_pending.assert_not_called()
  97. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  98. assert x.get(propagate=True) == 84
  99. x.backend.wait_for_pending.assert_called()
  100. def test_get_children(self):
  101. tid = uuid()
  102. x = self.app.AsyncResult(tid)
  103. child = [self.app.AsyncResult(uuid()).as_tuple()
  104. for i in range(10)]
  105. x._cache = {'children': child}
  106. assert x.children
  107. assert len(x.children) == 10
  108. x._cache = {'status': states.SUCCESS}
  109. x.backend._cache[tid] = {'result': None}
  110. assert x.children is None
  111. def test_build_graph_get_leaf_collect(self):
  112. x = self.app.AsyncResult('1')
  113. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  114. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  115. x.iterdeps = Mock()
  116. x.iterdeps.return_value = (
  117. (None, x),
  118. (x, c[0]),
  119. (c[0], c[1]),
  120. (c[1], c[2])
  121. )
  122. x.backend.READY_STATES = states.READY_STATES
  123. assert x.graph
  124. assert x.get_leaf() is 2
  125. it = x.collect()
  126. assert list(it) == [
  127. (x, None),
  128. (c[0], 0),
  129. (c[1], 1),
  130. (c[2], 2),
  131. ]
  132. def test_iterdeps(self):
  133. x = self.app.AsyncResult('1')
  134. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  135. x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
  136. for child in c:
  137. child.backend = Mock()
  138. child.backend.get_children.return_value = []
  139. it = x.iterdeps()
  140. assert list(it) == [
  141. (None, x),
  142. (x, c[0]),
  143. (x, c[1]),
  144. (x, c[2]),
  145. ]
  146. x._cache = None
  147. x.ready = Mock()
  148. x.ready.return_value = False
  149. with pytest.raises(IncompleteStream):
  150. list(x.iterdeps())
  151. list(x.iterdeps(intermediate=True))
  152. def test_eq_not_implemented(self):
  153. assert self.app.AsyncResult('1') != object()
  154. @pytest.mark.usefixtures('depends_on_current_app')
  155. def test_reduce(self):
  156. a1 = self.app.AsyncResult('uuid')
  157. restored = pickle.loads(pickle.dumps(a1))
  158. assert restored.id == 'uuid'
  159. a2 = self.app.AsyncResult('uuid')
  160. assert pickle.loads(pickle.dumps(a2)).id == 'uuid'
  161. def test_maybe_set_cache_empty(self):
  162. self.app.AsyncResult('uuid')._maybe_set_cache(None)
  163. def test_set_cache__children(self):
  164. r1 = self.app.AsyncResult('id1')
  165. r2 = self.app.AsyncResult('id2')
  166. r1._set_cache({'children': [r2.as_tuple()]})
  167. assert r2 in r1.children
  168. def test_successful(self):
  169. ok_res = self.app.AsyncResult(self.task1['id'])
  170. nok_res = self.app.AsyncResult(self.task3['id'])
  171. nok_res2 = self.app.AsyncResult(self.task4['id'])
  172. assert ok_res.successful()
  173. assert not nok_res.successful()
  174. assert not nok_res2.successful()
  175. pending_res = self.app.AsyncResult(uuid())
  176. assert not pending_res.successful()
  177. def test_raising(self):
  178. notb = self.app.AsyncResult(self.task3['id'])
  179. withtb = self.app.AsyncResult(self.task5['id'])
  180. with pytest.raises(KeyError):
  181. notb.get()
  182. try:
  183. withtb.get()
  184. except KeyError:
  185. tb = traceback.format_exc()
  186. assert ' File "foo.py", line 2, in foofunc' not in tb
  187. assert ' File "bar.py", line 3, in barfunc' not in tb
  188. assert 'KeyError:' in tb
  189. assert "'blue'" in tb
  190. else:
  191. raise AssertionError('Did not raise KeyError.')
  192. @skip.unless_module('tblib')
  193. def test_raising_remote_tracebacks(self):
  194. withtb = self.app.AsyncResult(self.task5['id'])
  195. self.app.conf.task_remote_tracebacks = True
  196. try:
  197. withtb.get()
  198. except KeyError:
  199. tb = traceback.format_exc()
  200. assert ' File "foo.py", line 2, in foofunc' in tb
  201. assert ' File "bar.py", line 3, in barfunc' in tb
  202. assert 'KeyError:' in tb
  203. assert "'blue'" in tb
  204. else:
  205. raise AssertionError('Did not raise KeyError.')
  206. def test_str(self):
  207. ok_res = self.app.AsyncResult(self.task1['id'])
  208. ok2_res = self.app.AsyncResult(self.task2['id'])
  209. nok_res = self.app.AsyncResult(self.task3['id'])
  210. assert str(ok_res) == self.task1['id']
  211. assert str(ok2_res) == self.task2['id']
  212. assert str(nok_res) == self.task3['id']
  213. pending_id = uuid()
  214. pending_res = self.app.AsyncResult(pending_id)
  215. assert str(pending_res) == pending_id
  216. def test_repr(self):
  217. ok_res = self.app.AsyncResult(self.task1['id'])
  218. ok2_res = self.app.AsyncResult(self.task2['id'])
  219. nok_res = self.app.AsyncResult(self.task3['id'])
  220. assert repr(ok_res) == '<AsyncResult: %s>' % (self.task1['id'],)
  221. assert repr(ok2_res) == '<AsyncResult: %s>' % (self.task2['id'],)
  222. assert repr(nok_res) == '<AsyncResult: %s>' % (self.task3['id'],)
  223. pending_id = uuid()
  224. pending_res = self.app.AsyncResult(pending_id)
  225. assert repr(pending_res) == '<AsyncResult: %s>' % (pending_id,)
  226. def test_hash(self):
  227. assert (hash(self.app.AsyncResult('x0w991')) ==
  228. hash(self.app.AsyncResult('x0w991')))
  229. assert (hash(self.app.AsyncResult('x0w991')) !=
  230. hash(self.app.AsyncResult('x1w991')))
  231. def test_get_traceback(self):
  232. ok_res = self.app.AsyncResult(self.task1['id'])
  233. nok_res = self.app.AsyncResult(self.task3['id'])
  234. nok_res2 = self.app.AsyncResult(self.task4['id'])
  235. assert not ok_res.traceback
  236. assert nok_res.traceback
  237. assert nok_res2.traceback
  238. pending_res = self.app.AsyncResult(uuid())
  239. assert not pending_res.traceback
  240. def test_get__backend_gives_None(self):
  241. res = self.app.AsyncResult(self.task1['id'])
  242. res.backend.wait_for = Mock(name='wait_for')
  243. res.backend.wait_for.return_value = None
  244. assert res.get() is None
  245. def test_get(self):
  246. ok_res = self.app.AsyncResult(self.task1['id'])
  247. ok2_res = self.app.AsyncResult(self.task2['id'])
  248. nok_res = self.app.AsyncResult(self.task3['id'])
  249. nok2_res = self.app.AsyncResult(self.task4['id'])
  250. callback = Mock(name='callback')
  251. assert ok_res.get(callback=callback) == 'the'
  252. callback.assert_called_with(ok_res.id, 'the')
  253. assert ok2_res.get() == 'quick'
  254. with pytest.raises(KeyError):
  255. nok_res.get()
  256. assert nok_res.get(propagate=False)
  257. assert isinstance(nok2_res.result, KeyError)
  258. assert ok_res.info == 'the'
  259. def test_eq_ne(self):
  260. r1 = self.app.AsyncResult(self.task1['id'])
  261. r2 = self.app.AsyncResult(self.task1['id'])
  262. r3 = self.app.AsyncResult(self.task2['id'])
  263. assert r1 == r2
  264. assert r1 != r3
  265. assert r1 == r2.id
  266. assert r1 != r3.id
  267. @pytest.mark.usefixtures('depends_on_current_app')
  268. def test_reduce_restore(self):
  269. r1 = self.app.AsyncResult(self.task1['id'])
  270. fun, args = r1.__reduce__()
  271. assert fun(*args) == r1
  272. def test_get_timeout(self):
  273. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  274. with pytest.raises(TimeoutError):
  275. res.get(timeout=0.001)
  276. pending_res = self.app.AsyncResult(uuid())
  277. with patch('celery.result.time') as _time:
  278. with pytest.raises(TimeoutError):
  279. pending_res.get(timeout=0.001, interval=0.001)
  280. _time.sleep.assert_called_with(0.001)
  281. def test_get_timeout_longer(self):
  282. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  283. with patch('celery.result.time') as _time:
  284. with pytest.raises(TimeoutError):
  285. res.get(timeout=1, interval=1)
  286. _time.sleep.assert_called_with(1)
  287. def test_ready(self):
  288. oks = (self.app.AsyncResult(self.task1['id']),
  289. self.app.AsyncResult(self.task2['id']),
  290. self.app.AsyncResult(self.task3['id']))
  291. assert all(result.ready() for result in oks)
  292. assert not self.app.AsyncResult(self.task4['id']).ready()
  293. assert not self.app.AsyncResult(uuid()).ready()
  294. class test_ResultSet:
  295. def test_resultset_repr(self):
  296. assert repr(self.app.ResultSet(
  297. [self.app.AsyncResult(t) for t in ['1', '2', '3']]))
  298. def test_eq_other(self):
  299. assert self.app.ResultSet([
  300. self.app.AsyncResult(t) for t in [1, 3, 3]]) != 1
  301. rs1 = self.app.ResultSet([self.app.AsyncResult(1)])
  302. rs2 = self.app.ResultSet([self.app.AsyncResult(1)])
  303. assert rs1 == rs2
  304. def test_get(self):
  305. x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  306. b = x.results[0].backend = Mock()
  307. b.supports_native_join = False
  308. x.join_native = Mock()
  309. x.join = Mock()
  310. x.get()
  311. x.join.assert_called()
  312. b.supports_native_join = True
  313. x.get()
  314. x.join_native.assert_called()
  315. def test_eq_ne(self):
  316. g1 = self.app.ResultSet([
  317. self.app.AsyncResult('id1'),
  318. self.app.AsyncResult('id2'),
  319. ])
  320. g2 = self.app.ResultSet([
  321. self.app.AsyncResult('id1'),
  322. self.app.AsyncResult('id2'),
  323. ])
  324. g3 = self.app.ResultSet([
  325. self.app.AsyncResult('id3'),
  326. self.app.AsyncResult('id1'),
  327. ])
  328. assert g1 == g2
  329. assert g1 != g3
  330. assert g1 != object()
  331. def test_takes_app_from_first_task(self):
  332. x = ResultSet([self.app.AsyncResult('id1')])
  333. assert x.app is x.results[0].app
  334. x.app = self.app
  335. assert x.app is self.app
  336. def test_get_empty(self):
  337. x = self.app.ResultSet([])
  338. assert x.supports_native_join is None
  339. x.join = Mock(name='join')
  340. x.get()
  341. x.join.assert_called()
  342. def test_add(self):
  343. x = self.app.ResultSet([self.app.AsyncResult(1)])
  344. x.add(self.app.AsyncResult(2))
  345. assert len(x) == 2
  346. x.add(self.app.AsyncResult(2))
  347. assert len(x) == 2
  348. @contextmanager
  349. def dummy_copy(self):
  350. with patch('celery.result.copy') as copy:
  351. def passt(arg):
  352. return arg
  353. copy.side_effect = passt
  354. yield
  355. def test_add_discard(self):
  356. x = self.app.ResultSet([])
  357. x.add(self.app.AsyncResult('1'))
  358. assert self.app.AsyncResult('1') in x.results
  359. x.discard(self.app.AsyncResult('1'))
  360. x.discard(self.app.AsyncResult('1'))
  361. x.discard('1')
  362. assert self.app.AsyncResult('1') not in x.results
  363. x.update([self.app.AsyncResult('2')])
  364. def test_clear(self):
  365. x = self.app.ResultSet([])
  366. r = x.results
  367. x.clear()
  368. assert x.results is r
  369. class MockAsyncResultFailure(AsyncResult):
  370. @property
  371. def result(self):
  372. return KeyError('baz')
  373. @property
  374. def state(self):
  375. return states.FAILURE
  376. def get(self, propagate=True, **kwargs):
  377. if propagate:
  378. raise self.result
  379. return self.result
  380. class MockAsyncResultSuccess(AsyncResult):
  381. forgotten = False
  382. def forget(self):
  383. self.forgotten = True
  384. @property
  385. def result(self):
  386. return 42
  387. @property
  388. def state(self):
  389. return states.SUCCESS
  390. def get(self, **kwargs):
  391. return self.result
  392. class SimpleBackend(SyncBackendMixin):
  393. ids = []
  394. def __init__(self, ids=[]):
  395. self.ids = ids
  396. def _ensure_not_eager(self):
  397. pass
  398. def get_many(self, *args, **kwargs):
  399. return ((id, {'result': i, 'status': states.SUCCESS})
  400. for i, id in enumerate(self.ids))
  401. class test_GroupResult:
  402. def setup(self):
  403. self.size = 10
  404. self.ts = self.app.GroupResult(
  405. uuid(), make_mock_group(self.app, self.size),
  406. )
  407. @pytest.mark.usefixtures('depends_on_current_app')
  408. def test_is_pickleable(self):
  409. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  410. assert pickle.loads(pickle.dumps(ts)) == ts
  411. ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  412. assert pickle.loads(pickle.dumps(ts2)) == ts2
  413. @pytest.mark.usefixtures('depends_on_current_app')
  414. def test_reduce(self):
  415. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  416. fun, args = ts.__reduce__()
  417. ts2 = fun(*args)
  418. assert ts2.id == ts.id
  419. assert ts == ts2
  420. def test_eq_ne(self):
  421. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  422. ts2 = self.app.GroupResult(ts.id, ts.results)
  423. ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  424. ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())])
  425. assert ts == ts2
  426. assert ts != ts3
  427. assert ts != ts4
  428. assert ts != object()
  429. def test_len(self):
  430. assert len(self.ts) == self.size
  431. def test_eq_other(self):
  432. assert self.ts != 1
  433. @pytest.mark.usefixtures('depends_on_current_app')
  434. def test_pickleable(self):
  435. assert pickle.loads(pickle.dumps(self.ts))
  436. def test_forget(self):
  437. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  438. MockAsyncResultSuccess(uuid(), app=self.app)]
  439. ts = self.app.GroupResult(uuid(), subs)
  440. ts.forget()
  441. for sub in subs:
  442. assert sub.forgotten
  443. def test_getitem(self):
  444. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  445. MockAsyncResultSuccess(uuid(), app=self.app)]
  446. ts = self.app.GroupResult(uuid(), subs)
  447. assert ts[0] is subs[0]
  448. def test_save_restore(self):
  449. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  450. MockAsyncResultSuccess(uuid(), app=self.app)]
  451. ts = self.app.GroupResult(uuid(), subs)
  452. ts.save()
  453. with pytest.raises(AttributeError):
  454. ts.save(backend=object())
  455. assert self.app.GroupResult.restore(ts.id).results == ts.results
  456. ts.delete()
  457. assert self.app.GroupResult.restore(ts.id) is None
  458. with pytest.raises(AttributeError):
  459. self.app.GroupResult.restore(ts.id, backend=object())
  460. def test_join_native(self):
  461. backend = SimpleBackend()
  462. results = [self.app.AsyncResult(uuid(), backend=backend)
  463. for i in range(10)]
  464. ts = self.app.GroupResult(uuid(), results)
  465. ts.app.backend = backend
  466. backend.ids = [result.id for result in results]
  467. res = ts.join_native()
  468. assert res == list(range(10))
  469. callback = Mock(name='callback')
  470. assert not ts.join_native(callback=callback)
  471. callback.assert_has_calls([
  472. call(r.id, i) for i, r in enumerate(ts.results)
  473. ])
  474. def test_join_native_raises(self):
  475. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  476. ts.iter_native = Mock()
  477. ts.iter_native.return_value = iter([
  478. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  479. ])
  480. with pytest.raises(KeyError):
  481. ts.join_native(propagate=True)
  482. def test_failed_join_report(self):
  483. res = Mock()
  484. ts = self.app.GroupResult(uuid(), [res])
  485. res.state = states.FAILURE
  486. res.backend.is_cached.return_value = True
  487. assert next(ts._failed_join_report()) is res
  488. res.backend.is_cached.return_value = False
  489. with pytest.raises(StopIteration):
  490. next(ts._failed_join_report())
  491. def test_repr(self):
  492. assert repr(
  493. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]))
  494. def test_children_is_results(self):
  495. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  496. assert ts.children is ts.results
  497. def test_iter_native(self):
  498. backend = SimpleBackend()
  499. results = [self.app.AsyncResult(uuid(), backend=backend)
  500. for i in range(10)]
  501. ts = self.app.GroupResult(uuid(), results)
  502. ts.app.backend = backend
  503. backend.ids = [result.id for result in results]
  504. assert len(list(ts.iter_native())) == 10
  505. def test_join_timeout(self):
  506. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  507. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  508. ar3 = self.app.AsyncResult(uuid())
  509. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  510. with pytest.raises(TimeoutError):
  511. ts.join(timeout=0.0000001)
  512. ar4 = self.app.AsyncResult(uuid())
  513. ar4.get = Mock()
  514. ts2 = self.app.GroupResult(uuid(), [ar4])
  515. assert ts2.join(timeout=0.1)
  516. callback = Mock(name='callback')
  517. assert not ts2.join(timeout=0.1, callback=callback)
  518. callback.assert_called_with(ar4.id, ar4.get())
  519. def test_iter_native_when_empty_group(self):
  520. ts = self.app.GroupResult(uuid(), [])
  521. assert list(ts.iter_native()) == []
  522. def test___iter__(self):
  523. assert list(iter(self.ts)) == self.ts.results
  524. def test_join(self):
  525. joined = self.ts.join()
  526. assert joined == list(range(self.size))
  527. def test_successful(self):
  528. assert self.ts.successful()
  529. def test_failed(self):
  530. assert not self.ts.failed()
  531. def test_maybe_throw(self):
  532. self.ts.results = [Mock(name='r1')]
  533. self.ts.maybe_throw()
  534. self.ts.results[0].maybe_throw.assert_called_with(
  535. callback=None, propagate=True,
  536. )
  537. def test_join__on_message(self):
  538. with pytest.raises(ImproperlyConfigured):
  539. self.ts.join(on_message=Mock())
  540. def test_waiting(self):
  541. assert not self.ts.waiting()
  542. def test_ready(self):
  543. assert self.ts.ready()
  544. def test_completed_count(self):
  545. assert self.ts.completed_count() == len(self.ts)
  546. class test_pending_AsyncResult:
  547. def test_result(self, app):
  548. res = app.AsyncResult(uuid())
  549. assert res.result is None
  550. class test_failed_AsyncResult:
  551. def setup(self):
  552. self.size = 11
  553. self.app.conf.result_serializer = 'pickle'
  554. results = make_mock_group(self.app, 10)
  555. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  556. save_result(self.app, failed)
  557. failed_res = self.app.AsyncResult(failed['id'])
  558. self.ts = self.app.GroupResult(uuid(), results + [failed_res])
  559. def test_completed_count(self):
  560. assert self.ts.completed_count() == len(self.ts) - 1
  561. def test_join(self):
  562. with pytest.raises(KeyError):
  563. self.ts.join()
  564. def test_successful(self):
  565. assert not self.ts.successful()
  566. def test_failed(self):
  567. assert self.ts.failed()
  568. class test_pending_Group:
  569. def setup(self):
  570. self.ts = self.app.GroupResult(
  571. uuid(), [self.app.AsyncResult(uuid()),
  572. self.app.AsyncResult(uuid())])
  573. def test_completed_count(self):
  574. assert self.ts.completed_count() == 0
  575. def test_ready(self):
  576. assert not self.ts.ready()
  577. def test_waiting(self):
  578. assert self.ts.waiting()
  579. def test_join(self):
  580. with pytest.raises(TimeoutError):
  581. self.ts.join(timeout=0.001)
  582. def test_join_longer(self):
  583. with pytest.raises(TimeoutError):
  584. self.ts.join(timeout=1)
  585. class test_EagerResult:
  586. def setup(self):
  587. @self.app.task(shared=False)
  588. def raising(x, y):
  589. raise KeyError(x, y)
  590. self.raising = raising
  591. def test_get_raises(self):
  592. res = self.raising.apply(args=[3, 3])
  593. with pytest.raises(KeyError):
  594. res.get()
  595. assert res.get(propagate=False)
  596. def test_get(self):
  597. res = EagerResult('x', 'x', states.RETRY)
  598. res.get()
  599. assert res.state == states.RETRY
  600. assert res.status == states.RETRY
  601. def test_forget(self):
  602. res = EagerResult('x', 'x', states.RETRY)
  603. res.forget()
  604. def test_revoke(self):
  605. res = self.raising.apply(args=[3, 3])
  606. assert not res.revoke()
  607. class test_tuples:
  608. def test_AsyncResult(self):
  609. x = self.app.AsyncResult(uuid())
  610. assert x, result_from_tuple(x.as_tuple() == self.app)
  611. assert x, result_from_tuple(x == self.app)
  612. def test_with_parent(self):
  613. x = self.app.AsyncResult(uuid())
  614. x.parent = self.app.AsyncResult(uuid())
  615. y = result_from_tuple(x.as_tuple(), self.app)
  616. assert y == x
  617. assert y.parent == x.parent
  618. assert isinstance(y.parent, AsyncResult)
  619. def test_compat(self):
  620. uid = uuid()
  621. x = result_from_tuple([uid, []], app=self.app)
  622. assert x.id == uid
  623. def test_GroupResult(self):
  624. x = self.app.GroupResult(
  625. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  626. )
  627. assert x, result_from_tuple(x.as_tuple() == self.app)
  628. assert x, result_from_tuple(x == self.app)