test_result.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. import traceback
  2. from contextlib import contextmanager
  3. from celery import uuid
  4. from celery import states
  5. from celery.backends.base import SyncBackendMixin
  6. from celery.exceptions import (
  7. ImproperlyConfigured, IncompleteStream, TimeoutError,
  8. )
  9. from celery.result import (
  10. AsyncResult,
  11. EagerResult,
  12. ResultSet,
  13. result_from_tuple,
  14. assert_will_not_block,
  15. )
  16. from celery.utils.serialization import pickle
  17. from celery.tests.case import (
  18. AppCase, Mock, call, depends_on_current_app, patch, skip,
  19. )
  20. PYTRACEBACK = """\
  21. Traceback (most recent call last):
  22. File "foo.py", line 2, in foofunc
  23. don't matter
  24. File "bar.py", line 3, in barfunc
  25. don't matter
  26. Doesn't matter: really!\
  27. """
  28. def mock_task(name, state, result, traceback=None):
  29. return dict(
  30. id=uuid(), name=name, state=state,
  31. result=result, traceback=traceback,
  32. )
  33. def save_result(app, task):
  34. traceback = task.get('traceback') or 'Some traceback'
  35. if task['state'] == states.SUCCESS:
  36. app.backend.mark_as_done(task['id'], task['result'])
  37. elif task['state'] == states.RETRY:
  38. app.backend.mark_as_retry(
  39. task['id'], task['result'], traceback=traceback,
  40. )
  41. else:
  42. app.backend.mark_as_failure(
  43. task['id'], task['result'], traceback=traceback,
  44. )
  45. def make_mock_group(app, size=10):
  46. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  47. [save_result(app, task) for task in tasks]
  48. return [app.AsyncResult(task['id']) for task in tasks]
  49. class test_AsyncResult(AppCase):
  50. def setup(self):
  51. self.app.conf.result_cache_max = 100
  52. self.app.conf.result_serializer = 'pickle'
  53. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  54. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  55. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  56. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  57. self.task5 = mock_task(
  58. 'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK,
  59. )
  60. for task in (self.task1, self.task2,
  61. self.task3, self.task4, self.task5):
  62. save_result(self.app, task)
  63. @self.app.task(shared=False)
  64. def mytask():
  65. pass
  66. self.mytask = mytask
  67. @patch('celery.result.task_join_will_block')
  68. def test_assert_will_not_block(self, task_join_will_block):
  69. task_join_will_block.return_value = True
  70. with self.assertRaises(RuntimeError):
  71. assert_will_not_block()
  72. task_join_will_block.return_value = False
  73. assert_will_not_block()
  74. def test_without_id(self):
  75. with self.assertRaises(ValueError):
  76. AsyncResult(None, app=self.app)
  77. def test_compat_properties(self):
  78. x = self.app.AsyncResult('1')
  79. self.assertEqual(x.task_id, x.id)
  80. x.task_id = '2'
  81. self.assertEqual(x.id, '2')
  82. @depends_on_current_app
  83. def test_reduce_direct(self):
  84. x = AsyncResult('1', app=self.app)
  85. fun, args = x.__reduce__()
  86. self.assertEqual(fun(*args), x)
  87. def test_children(self):
  88. x = self.app.AsyncResult('1')
  89. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  90. x._cache = {'children': children, 'status': states.SUCCESS}
  91. x.backend = Mock()
  92. self.assertTrue(x.children)
  93. self.assertEqual(len(x.children), 3)
  94. def test_propagates_for_parent(self):
  95. x = self.app.AsyncResult(uuid())
  96. x.backend = Mock(name='backend')
  97. x.backend.get_task_meta.return_value = {}
  98. x.backend.wait_for_pending.return_value = 84
  99. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  100. with self.assertRaises(KeyError):
  101. x.get(propagate=True)
  102. x.backend.wait_for_pending.assert_not_called()
  103. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  104. self.assertEqual(x.get(propagate=True), 84)
  105. x.backend.wait_for_pending.assert_called()
  106. def test_get_children(self):
  107. tid = uuid()
  108. x = self.app.AsyncResult(tid)
  109. child = [self.app.AsyncResult(uuid()).as_tuple()
  110. for i in range(10)]
  111. x._cache = {'children': child}
  112. self.assertTrue(x.children)
  113. self.assertEqual(len(x.children), 10)
  114. x._cache = {'status': states.SUCCESS}
  115. x.backend._cache[tid] = {'result': None}
  116. self.assertIsNone(x.children)
  117. def test_build_graph_get_leaf_collect(self):
  118. x = self.app.AsyncResult('1')
  119. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  120. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  121. x.iterdeps = Mock()
  122. x.iterdeps.return_value = (
  123. (None, x),
  124. (x, c[0]),
  125. (c[0], c[1]),
  126. (c[1], c[2])
  127. )
  128. x.backend.READY_STATES = states.READY_STATES
  129. self.assertTrue(x.graph)
  130. self.assertIs(x.get_leaf(), 2)
  131. it = x.collect()
  132. self.assertListEqual(list(it), [
  133. (x, None),
  134. (c[0], 0),
  135. (c[1], 1),
  136. (c[2], 2),
  137. ])
  138. def test_iterdeps(self):
  139. x = self.app.AsyncResult('1')
  140. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  141. x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
  142. for child in c:
  143. child.backend = Mock()
  144. child.backend.get_children.return_value = []
  145. it = x.iterdeps()
  146. self.assertListEqual(list(it), [
  147. (None, x),
  148. (x, c[0]),
  149. (x, c[1]),
  150. (x, c[2]),
  151. ])
  152. x._cache = None
  153. x.ready = Mock()
  154. x.ready.return_value = False
  155. with self.assertRaises(IncompleteStream):
  156. list(x.iterdeps())
  157. list(x.iterdeps(intermediate=True))
  158. def test_eq_not_implemented(self):
  159. self.assertNotEqual(self.app.AsyncResult('1'), object())
  160. @depends_on_current_app
  161. def test_reduce(self):
  162. a1 = self.app.AsyncResult('uuid')
  163. restored = pickle.loads(pickle.dumps(a1))
  164. self.assertEqual(restored.id, 'uuid')
  165. a2 = self.app.AsyncResult('uuid')
  166. self.assertEqual(pickle.loads(pickle.dumps(a2)).id, 'uuid')
  167. def test_maybe_set_cache_empty(self):
  168. self.app.AsyncResult('uuid')._maybe_set_cache(None)
  169. def test_set_cache__children(self):
  170. r1 = self.app.AsyncResult('id1')
  171. r2 = self.app.AsyncResult('id2')
  172. r1._set_cache({'children': [r2.as_tuple()]})
  173. self.assertIn(r2, r1.children)
  174. def test_successful(self):
  175. ok_res = self.app.AsyncResult(self.task1['id'])
  176. nok_res = self.app.AsyncResult(self.task3['id'])
  177. nok_res2 = self.app.AsyncResult(self.task4['id'])
  178. self.assertTrue(ok_res.successful())
  179. self.assertFalse(nok_res.successful())
  180. self.assertFalse(nok_res2.successful())
  181. pending_res = self.app.AsyncResult(uuid())
  182. self.assertFalse(pending_res.successful())
  183. def test_raising(self):
  184. notb = self.app.AsyncResult(self.task3['id'])
  185. withtb = self.app.AsyncResult(self.task5['id'])
  186. with self.assertRaises(KeyError):
  187. notb.get()
  188. try:
  189. withtb.get()
  190. except KeyError:
  191. tb = traceback.format_exc()
  192. self.assertNotIn(' File "foo.py", line 2, in foofunc', tb)
  193. self.assertNotIn(' File "bar.py", line 3, in barfunc', tb)
  194. self.assertIn('KeyError:', tb)
  195. self.assertIn("'blue'", tb)
  196. else:
  197. raise AssertionError('Did not raise KeyError.')
  198. @skip.unless_module('tblib')
  199. def test_raising_remote_tracebacks(self):
  200. withtb = self.app.AsyncResult(self.task5['id'])
  201. old, self.app.conf.remote_tracebacks = (
  202. self.app.conf.remote_tracebacks, True)
  203. try:
  204. withtb.get()
  205. except KeyError:
  206. tb = traceback.format_exc()
  207. self.assertIn(' File "foo.py", line 2, in foofunc', tb)
  208. self.assertIn(' File "bar.py", line 3, in barfunc', tb)
  209. self.assertIn('KeyError:', tb)
  210. self.assertIn("'blue'", tb)
  211. else:
  212. raise AssertionError('Did not raise KeyError.')
  213. finally:
  214. self.app.conf.remote_tracebacks = old
  215. def test_str(self):
  216. ok_res = self.app.AsyncResult(self.task1['id'])
  217. ok2_res = self.app.AsyncResult(self.task2['id'])
  218. nok_res = self.app.AsyncResult(self.task3['id'])
  219. self.assertEqual(str(ok_res), self.task1['id'])
  220. self.assertEqual(str(ok2_res), self.task2['id'])
  221. self.assertEqual(str(nok_res), self.task3['id'])
  222. pending_id = uuid()
  223. pending_res = self.app.AsyncResult(pending_id)
  224. self.assertEqual(str(pending_res), pending_id)
  225. def test_repr(self):
  226. ok_res = self.app.AsyncResult(self.task1['id'])
  227. ok2_res = self.app.AsyncResult(self.task2['id'])
  228. nok_res = self.app.AsyncResult(self.task3['id'])
  229. self.assertEqual(repr(ok_res), '<AsyncResult: %s>' % (
  230. self.task1['id']))
  231. self.assertEqual(repr(ok2_res), '<AsyncResult: %s>' % (
  232. self.task2['id']))
  233. self.assertEqual(repr(nok_res), '<AsyncResult: %s>' % (
  234. self.task3['id']))
  235. pending_id = uuid()
  236. pending_res = self.app.AsyncResult(pending_id)
  237. self.assertEqual(repr(pending_res), '<AsyncResult: %s>' % (
  238. pending_id))
  239. def test_hash(self):
  240. self.assertEqual(hash(self.app.AsyncResult('x0w991')),
  241. hash(self.app.AsyncResult('x0w991')))
  242. self.assertNotEqual(hash(self.app.AsyncResult('x0w991')),
  243. hash(self.app.AsyncResult('x1w991')))
  244. def test_get_traceback(self):
  245. ok_res = self.app.AsyncResult(self.task1['id'])
  246. nok_res = self.app.AsyncResult(self.task3['id'])
  247. nok_res2 = self.app.AsyncResult(self.task4['id'])
  248. self.assertFalse(ok_res.traceback)
  249. self.assertTrue(nok_res.traceback)
  250. self.assertTrue(nok_res2.traceback)
  251. pending_res = self.app.AsyncResult(uuid())
  252. self.assertFalse(pending_res.traceback)
  253. def test_get__backend_gives_None(self):
  254. res = self.app.AsyncResult(self.task1['id'])
  255. res.backend.wait_for = Mock(name='wait_for')
  256. res.backend.wait_for.return_value = None
  257. self.assertIsNone(res.get())
  258. def test_get(self):
  259. ok_res = self.app.AsyncResult(self.task1['id'])
  260. ok2_res = self.app.AsyncResult(self.task2['id'])
  261. nok_res = self.app.AsyncResult(self.task3['id'])
  262. nok2_res = self.app.AsyncResult(self.task4['id'])
  263. callback = Mock(name='callback')
  264. self.assertEqual(ok_res.get(callback=callback), 'the')
  265. callback.assert_called_with(ok_res.id, 'the')
  266. self.assertEqual(ok2_res.get(), 'quick')
  267. with self.assertRaises(KeyError):
  268. nok_res.get()
  269. self.assertTrue(nok_res.get(propagate=False))
  270. self.assertIsInstance(nok2_res.result, KeyError)
  271. self.assertEqual(ok_res.info, 'the')
  272. def test_eq_ne(self):
  273. r1 = self.app.AsyncResult(self.task1['id'])
  274. r2 = self.app.AsyncResult(self.task1['id'])
  275. r3 = self.app.AsyncResult(self.task2['id'])
  276. self.assertEqual(r1, r2)
  277. self.assertNotEqual(r1, r3)
  278. self.assertEqual(r1, r2.id)
  279. self.assertNotEqual(r1, r3.id)
  280. @depends_on_current_app
  281. def test_reduce_restore(self):
  282. r1 = self.app.AsyncResult(self.task1['id'])
  283. fun, args = r1.__reduce__()
  284. self.assertEqual(fun(*args), r1)
  285. def test_get_timeout(self):
  286. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  287. with self.assertRaises(TimeoutError):
  288. res.get(timeout=0.001)
  289. pending_res = self.app.AsyncResult(uuid())
  290. with patch('celery.result.time') as _time:
  291. with self.assertRaises(TimeoutError):
  292. pending_res.get(timeout=0.001, interval=0.001)
  293. _time.sleep.assert_called_with(0.001)
  294. def test_get_timeout_longer(self):
  295. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  296. with patch('celery.result.time') as _time:
  297. with self.assertRaises(TimeoutError):
  298. res.get(timeout=1, interval=1)
  299. _time.sleep.assert_called_with(1)
  300. def test_ready(self):
  301. oks = (self.app.AsyncResult(self.task1['id']),
  302. self.app.AsyncResult(self.task2['id']),
  303. self.app.AsyncResult(self.task3['id']))
  304. self.assertTrue(all(result.ready() for result in oks))
  305. self.assertFalse(self.app.AsyncResult(self.task4['id']).ready())
  306. self.assertFalse(self.app.AsyncResult(uuid()).ready())
  307. class test_ResultSet(AppCase):
  308. def test_resultset_repr(self):
  309. self.assertTrue(repr(self.app.ResultSet(
  310. [self.app.AsyncResult(t) for t in ['1', '2', '3']])))
  311. def test_eq_other(self):
  312. self.assertNotEqual(
  313. self.app.ResultSet([self.app.AsyncResult(t)
  314. for t in [1, 3, 3]]),
  315. 1,
  316. )
  317. rs1 = self.app.ResultSet([self.app.AsyncResult(1)])
  318. rs2 = self.app.ResultSet([self.app.AsyncResult(1)])
  319. self.assertEqual(rs1, rs2)
  320. def test_get(self):
  321. x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  322. b = x.results[0].backend = Mock()
  323. b.supports_native_join = False
  324. x.join_native = Mock()
  325. x.join = Mock()
  326. x.get()
  327. x.join.assert_called()
  328. b.supports_native_join = True
  329. x.get()
  330. x.join_native.assert_called()
  331. def test_eq_ne(self):
  332. g1 = self.app.ResultSet([
  333. self.app.AsyncResult('id1'),
  334. self.app.AsyncResult('id2'),
  335. ])
  336. g2 = self.app.ResultSet([
  337. self.app.AsyncResult('id1'),
  338. self.app.AsyncResult('id2'),
  339. ])
  340. g3 = self.app.ResultSet([
  341. self.app.AsyncResult('id3'),
  342. self.app.AsyncResult('id1'),
  343. ])
  344. self.assertEqual(g1, g2)
  345. self.assertNotEqual(g1, g3)
  346. self.assertNotEqual(g1, object())
  347. def test_takes_app_from_first_task(self):
  348. x = ResultSet([self.app.AsyncResult('id1')])
  349. self.assertIs(x.app, x.results[0].app)
  350. x.app = self.app
  351. self.assertIs(x.app, self.app)
  352. def test_get_empty(self):
  353. x = self.app.ResultSet([])
  354. self.assertIsNone(x.supports_native_join)
  355. x.join = Mock(name='join')
  356. x.get()
  357. x.join.assert_called()
  358. def test_add(self):
  359. x = self.app.ResultSet([self.app.AsyncResult(1)])
  360. x.add(self.app.AsyncResult(2))
  361. self.assertEqual(len(x), 2)
  362. x.add(self.app.AsyncResult(2))
  363. self.assertEqual(len(x), 2)
  364. @contextmanager
  365. def dummy_copy(self):
  366. with patch('celery.result.copy') as copy:
  367. def passt(arg):
  368. return arg
  369. copy.side_effect = passt
  370. yield
  371. def test_iterate_respects_subpolling_interval(self):
  372. r1 = self.app.AsyncResult(uuid())
  373. r2 = self.app.AsyncResult(uuid())
  374. backend = r1.backend = r2.backend = Mock()
  375. backend.subpolling_interval = 10
  376. ready = r1.ready = r2.ready = Mock()
  377. def se(*args, **kwargs):
  378. ready.side_effect = KeyError()
  379. return False
  380. ready.return_value = False
  381. ready.side_effect = se
  382. x = self.app.ResultSet([r1, r2])
  383. with self.dummy_copy():
  384. with patch('celery.result.time') as _time:
  385. with self.assertPendingDeprecation():
  386. with self.assertRaises(KeyError):
  387. list(x.iterate())
  388. _time.sleep.assert_called_with(10)
  389. backend.subpolling_interval = 0
  390. with patch('celery.result.time') as _time:
  391. with self.assertPendingDeprecation():
  392. with self.assertRaises(KeyError):
  393. ready.return_value = False
  394. ready.side_effect = se
  395. list(x.iterate())
  396. _time.sleep.assert_not_called()
  397. def test_times_out(self):
  398. r1 = self.app.AsyncResult(uuid)
  399. r1.ready = Mock()
  400. r1.ready.return_value = False
  401. x = self.app.ResultSet([r1])
  402. with self.dummy_copy():
  403. with patch('celery.result.time'):
  404. with self.assertPendingDeprecation():
  405. with self.assertRaises(TimeoutError):
  406. list(x.iterate(timeout=1))
  407. def test_add_discard(self):
  408. x = self.app.ResultSet([])
  409. x.add(self.app.AsyncResult('1'))
  410. self.assertIn(self.app.AsyncResult('1'), x.results)
  411. x.discard(self.app.AsyncResult('1'))
  412. x.discard(self.app.AsyncResult('1'))
  413. x.discard('1')
  414. self.assertNotIn(self.app.AsyncResult('1'), x.results)
  415. x.update([self.app.AsyncResult('2')])
  416. def test_clear(self):
  417. x = self.app.ResultSet([])
  418. r = x.results
  419. x.clear()
  420. self.assertIs(x.results, r)
  421. class MockAsyncResultFailure(AsyncResult):
  422. @property
  423. def result(self):
  424. return KeyError('baz')
  425. @property
  426. def state(self):
  427. return states.FAILURE
  428. def get(self, propagate=True, **kwargs):
  429. if propagate:
  430. raise self.result
  431. return self.result
  432. class MockAsyncResultSuccess(AsyncResult):
  433. forgotten = False
  434. def forget(self):
  435. self.forgotten = True
  436. @property
  437. def result(self):
  438. return 42
  439. @property
  440. def state(self):
  441. return states.SUCCESS
  442. def get(self, **kwargs):
  443. return self.result
  444. class SimpleBackend(SyncBackendMixin):
  445. ids = []
  446. def __init__(self, ids=[]):
  447. self.ids = ids
  448. def _ensure_not_eager(self):
  449. pass
  450. def get_many(self, *args, **kwargs):
  451. return ((id, {'result': i, 'status': states.SUCCESS})
  452. for i, id in enumerate(self.ids))
  453. class test_GroupResult(AppCase):
  454. def setup(self):
  455. self.size = 10
  456. self.ts = self.app.GroupResult(
  457. uuid(), make_mock_group(self.app, self.size),
  458. )
  459. @depends_on_current_app
  460. def test_is_pickleable(self):
  461. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  462. self.assertEqual(pickle.loads(pickle.dumps(ts)), ts)
  463. ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  464. self.assertEqual(pickle.loads(pickle.dumps(ts2)), ts2)
  465. @depends_on_current_app
  466. def test_reduce(self):
  467. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  468. fun, args = ts.__reduce__()
  469. ts2 = fun(*args)
  470. self.assertEqual(ts2.id, ts.id)
  471. self.assertEqual(ts, ts2)
  472. def test_eq_ne(self):
  473. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  474. ts2 = self.app.GroupResult(ts.id, ts.results)
  475. ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  476. ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())])
  477. self.assertEqual(ts, ts2)
  478. self.assertNotEqual(ts, ts3)
  479. self.assertNotEqual(ts, ts4)
  480. self.assertNotEqual(ts, object())
  481. def test_len(self):
  482. self.assertEqual(len(self.ts), self.size)
  483. def test_eq_other(self):
  484. self.assertNotEqual(self.ts, 1)
  485. @depends_on_current_app
  486. def test_pickleable(self):
  487. self.assertTrue(pickle.loads(pickle.dumps(self.ts)))
  488. def test_iterate_raises(self):
  489. ar = MockAsyncResultFailure(uuid(), app=self.app)
  490. ts = self.app.GroupResult(uuid(), [ar])
  491. with self.assertPendingDeprecation():
  492. it = ts.iterate()
  493. with self.assertRaises(KeyError):
  494. next(it)
  495. def test_forget(self):
  496. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  497. MockAsyncResultSuccess(uuid(), app=self.app)]
  498. ts = self.app.GroupResult(uuid(), subs)
  499. ts.forget()
  500. for sub in subs:
  501. self.assertTrue(sub.forgotten)
  502. def test_getitem(self):
  503. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  504. MockAsyncResultSuccess(uuid(), app=self.app)]
  505. ts = self.app.GroupResult(uuid(), subs)
  506. self.assertIs(ts[0], subs[0])
  507. def test_save_restore(self):
  508. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  509. MockAsyncResultSuccess(uuid(), app=self.app)]
  510. ts = self.app.GroupResult(uuid(), subs)
  511. ts.save()
  512. with self.assertRaises(AttributeError):
  513. ts.save(backend=object())
  514. self.assertEqual(self.app.GroupResult.restore(ts.id).results,
  515. ts.results)
  516. ts.delete()
  517. self.assertIsNone(self.app.GroupResult.restore(ts.id))
  518. with self.assertRaises(AttributeError):
  519. self.app.GroupResult.restore(ts.id, backend=object())
  520. def test_join_native(self):
  521. backend = SimpleBackend()
  522. results = [self.app.AsyncResult(uuid(), backend=backend)
  523. for i in range(10)]
  524. ts = self.app.GroupResult(uuid(), results)
  525. ts.app.backend = backend
  526. backend.ids = [result.id for result in results]
  527. res = ts.join_native()
  528. self.assertEqual(res, list(range(10)))
  529. callback = Mock(name='callback')
  530. self.assertFalse(ts.join_native(callback=callback))
  531. callback.assert_has_calls([
  532. call(r.id, i) for i, r in enumerate(ts.results)
  533. ])
  534. def test_join_native_raises(self):
  535. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  536. ts.iter_native = Mock()
  537. ts.iter_native.return_value = iter([
  538. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  539. ])
  540. with self.assertRaises(KeyError):
  541. ts.join_native(propagate=True)
  542. def test_failed_join_report(self):
  543. res = Mock()
  544. ts = self.app.GroupResult(uuid(), [res])
  545. res.state = states.FAILURE
  546. res.backend.is_cached.return_value = True
  547. self.assertIs(next(ts._failed_join_report()), res)
  548. res.backend.is_cached.return_value = False
  549. with self.assertRaises(StopIteration):
  550. next(ts._failed_join_report())
  551. def test_repr(self):
  552. self.assertTrue(repr(
  553. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  554. ))
  555. def test_children_is_results(self):
  556. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  557. self.assertIs(ts.children, ts.results)
  558. def test_iter_native(self):
  559. backend = SimpleBackend()
  560. results = [self.app.AsyncResult(uuid(), backend=backend)
  561. for i in range(10)]
  562. ts = self.app.GroupResult(uuid(), results)
  563. ts.app.backend = backend
  564. backend.ids = [result.id for result in results]
  565. self.assertEqual(len(list(ts.iter_native())), 10)
  566. def test_iterate_yields(self):
  567. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  568. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  569. ts = self.app.GroupResult(uuid(), [ar, ar2])
  570. with self.assertPendingDeprecation():
  571. it = ts.iterate()
  572. self.assertEqual(next(it), 42)
  573. self.assertEqual(next(it), 42)
  574. def test_iterate_eager(self):
  575. ar1 = EagerResult(uuid(), 42, states.SUCCESS)
  576. ar2 = EagerResult(uuid(), 42, states.SUCCESS)
  577. ts = self.app.GroupResult(uuid(), [ar1, ar2])
  578. with self.assertPendingDeprecation():
  579. it = ts.iterate()
  580. self.assertEqual(next(it), 42)
  581. self.assertEqual(next(it), 42)
  582. def test_join_timeout(self):
  583. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  584. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  585. ar3 = self.app.AsyncResult(uuid())
  586. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  587. with self.assertRaises(TimeoutError):
  588. ts.join(timeout=0.0000001)
  589. ar4 = self.app.AsyncResult(uuid())
  590. ar4.get = Mock()
  591. ts2 = self.app.GroupResult(uuid(), [ar4])
  592. self.assertTrue(ts2.join(timeout=0.1))
  593. callback = Mock(name='callback')
  594. self.assertFalse(ts2.join(timeout=0.1, callback=callback))
  595. callback.assert_called_with(ar4.id, ar4.get())
  596. def test_iter_native_when_empty_group(self):
  597. ts = self.app.GroupResult(uuid(), [])
  598. self.assertListEqual(list(ts.iter_native()), [])
  599. def test_iterate_simple(self):
  600. with self.assertPendingDeprecation():
  601. it = self.ts.iterate()
  602. results = sorted(list(it))
  603. self.assertListEqual(results, list(range(self.size)))
  604. def test___iter__(self):
  605. self.assertListEqual(list(iter(self.ts)), self.ts.results)
  606. def test_join(self):
  607. joined = self.ts.join()
  608. self.assertListEqual(joined, list(range(self.size)))
  609. def test_successful(self):
  610. self.assertTrue(self.ts.successful())
  611. def test_failed(self):
  612. self.assertFalse(self.ts.failed())
  613. def test_maybe_throw(self):
  614. self.ts.results = [Mock(name='r1')]
  615. self.ts.maybe_throw()
  616. self.ts.results[0].maybe_throw.assert_called_with(
  617. callback=None, propagate=True,
  618. )
  619. def test_join__on_message(self):
  620. with self.assertRaises(ImproperlyConfigured):
  621. self.ts.join(on_message=Mock())
  622. def test_waiting(self):
  623. self.assertFalse(self.ts.waiting())
  624. def test_ready(self):
  625. self.assertTrue(self.ts.ready())
  626. def test_completed_count(self):
  627. self.assertEqual(self.ts.completed_count(), len(self.ts))
  628. class test_pending_AsyncResult(AppCase):
  629. def setup(self):
  630. self.task = self.app.AsyncResult(uuid())
  631. def test_result(self):
  632. self.assertIsNone(self.task.result)
  633. class test_failed_AsyncResult(test_GroupResult):
  634. def setup(self):
  635. self.app.conf.result_serializer = 'pickle'
  636. self.size = 11
  637. results = make_mock_group(self.app, 10)
  638. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  639. save_result(self.app, failed)
  640. failed_res = self.app.AsyncResult(failed['id'])
  641. self.ts = self.app.GroupResult(uuid(), results + [failed_res])
  642. def test_completed_count(self):
  643. self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
  644. def test_iterate_simple(self):
  645. with self.assertPendingDeprecation():
  646. it = self.ts.iterate()
  647. def consume():
  648. return list(it)
  649. with self.assertRaises(KeyError):
  650. consume()
  651. def test_join(self):
  652. with self.assertRaises(KeyError):
  653. self.ts.join()
  654. def test_successful(self):
  655. self.assertFalse(self.ts.successful())
  656. def test_failed(self):
  657. self.assertTrue(self.ts.failed())
  658. class test_pending_Group(AppCase):
  659. def setup(self):
  660. self.ts = self.app.GroupResult(
  661. uuid(), [self.app.AsyncResult(uuid()),
  662. self.app.AsyncResult(uuid())])
  663. def test_completed_count(self):
  664. self.assertEqual(self.ts.completed_count(), 0)
  665. def test_ready(self):
  666. self.assertFalse(self.ts.ready())
  667. def test_waiting(self):
  668. self.assertTrue(self.ts.waiting())
  669. def test_join(self):
  670. with self.assertRaises(TimeoutError):
  671. self.ts.join(timeout=0.001)
  672. def test_join_longer(self):
  673. with self.assertRaises(TimeoutError):
  674. self.ts.join(timeout=1)
  675. class test_EagerResult(AppCase):
  676. def setup(self):
  677. @self.app.task(shared=False)
  678. def raising(x, y):
  679. raise KeyError(x, y)
  680. self.raising = raising
  681. def test_get_raises(self):
  682. res = self.raising.apply(args=[3, 3])
  683. with self.assertRaises(KeyError):
  684. res.get()
  685. self.assertTrue(res.get(propagate=False))
  686. def test_get(self):
  687. res = EagerResult('x', 'x', states.RETRY)
  688. res.get()
  689. self.assertEqual(res.state, states.RETRY)
  690. self.assertEqual(res.status, states.RETRY)
  691. def test_forget(self):
  692. res = EagerResult('x', 'x', states.RETRY)
  693. res.forget()
  694. def test_revoke(self):
  695. res = self.raising.apply(args=[3, 3])
  696. self.assertFalse(res.revoke())
  697. class test_tuples(AppCase):
  698. def test_AsyncResult(self):
  699. x = self.app.AsyncResult(uuid())
  700. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  701. self.assertEqual(x, result_from_tuple(x, self.app))
  702. def test_with_parent(self):
  703. x = self.app.AsyncResult(uuid())
  704. x.parent = self.app.AsyncResult(uuid())
  705. y = result_from_tuple(x.as_tuple(), self.app)
  706. self.assertEqual(y, x)
  707. self.assertEqual(y.parent, x.parent)
  708. self.assertIsInstance(y.parent, AsyncResult)
  709. def test_compat(self):
  710. uid = uuid()
  711. x = result_from_tuple([uid, []], app=self.app)
  712. self.assertEqual(x.id, uid)
  713. def test_GroupResult(self):
  714. x = self.app.GroupResult(
  715. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  716. )
  717. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  718. self.assertEqual(x, result_from_tuple(x, self.app))