test_result.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. from __future__ import absolute_import, unicode_literals
  2. from contextlib import contextmanager
  3. from celery import states
  4. from celery.backends.base import SyncBackendMixin
  5. from celery.exceptions import (
  6. ImproperlyConfigured, IncompleteStream, TimeoutError,
  7. )
  8. from celery.five import range
  9. from celery.result import (
  10. AsyncResult,
  11. EagerResult,
  12. ResultSet,
  13. result_from_tuple,
  14. assert_will_not_block,
  15. )
  16. from celery.utils import uuid
  17. from celery.utils.serialization import pickle
  18. from celery.tests.case import (
  19. AppCase, Mock, call, depends_on_current_app, patch,
  20. )
  21. def mock_task(name, state, result):
  22. return dict(id=uuid(), name=name, state=state, result=result)
  23. def save_result(app, task):
  24. traceback = 'Some traceback'
  25. if task['state'] == states.SUCCESS:
  26. app.backend.mark_as_done(task['id'], task['result'])
  27. elif task['state'] == states.RETRY:
  28. app.backend.mark_as_retry(
  29. task['id'], task['result'], traceback=traceback,
  30. )
  31. else:
  32. app.backend.mark_as_failure(
  33. task['id'], task['result'], traceback=traceback,
  34. )
  35. def make_mock_group(app, size=10):
  36. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  37. [save_result(app, task) for task in tasks]
  38. return [app.AsyncResult(task['id']) for task in tasks]
  39. class test_AsyncResult(AppCase):
  40. def setup(self):
  41. self.app.conf.result_cache_max = 100
  42. self.app.conf.result_serializer = 'pickle'
  43. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  44. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  45. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  46. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  47. for task in (self.task1, self.task2, self.task3, self.task4):
  48. save_result(self.app, task)
  49. @self.app.task(shared=False)
  50. def mytask():
  51. pass
  52. self.mytask = mytask
  53. @patch('celery.result.task_join_will_block')
  54. def test_assert_will_not_block(self, task_join_will_block):
  55. task_join_will_block.return_value = True
  56. with self.assertRaises(RuntimeError):
  57. assert_will_not_block()
  58. task_join_will_block.return_value = False
  59. assert_will_not_block()
  60. def test_without_id(self):
  61. with self.assertRaises(ValueError):
  62. AsyncResult(None, app=self.app)
  63. def test_compat_properties(self):
  64. x = self.app.AsyncResult('1')
  65. self.assertEqual(x.task_id, x.id)
  66. x.task_id = '2'
  67. self.assertEqual(x.id, '2')
  68. @depends_on_current_app
  69. def test_reduce_direct(self):
  70. x = AsyncResult('1', app=self.app)
  71. fun, args = x.__reduce__()
  72. self.assertEqual(fun(*args), x)
  73. def test_children(self):
  74. x = self.app.AsyncResult('1')
  75. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  76. x._cache = {'children': children, 'status': states.SUCCESS}
  77. x.backend = Mock()
  78. self.assertTrue(x.children)
  79. self.assertEqual(len(x.children), 3)
  80. def test_propagates_for_parent(self):
  81. x = self.app.AsyncResult(uuid())
  82. x.backend = Mock(name='backend')
  83. x.backend.get_task_meta.return_value = {}
  84. x.backend.wait_for_pending.return_value = 84
  85. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  86. with self.assertRaises(KeyError):
  87. x.get(propagate=True)
  88. self.assertFalse(x.backend.wait_for_pending.called)
  89. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  90. self.assertEqual(x.get(propagate=True), 84)
  91. self.assertTrue(x.backend.wait_for_pending.called)
  92. def test_get_children(self):
  93. tid = uuid()
  94. x = self.app.AsyncResult(tid)
  95. child = [self.app.AsyncResult(uuid()).as_tuple()
  96. for i in range(10)]
  97. x._cache = {'children': child}
  98. self.assertTrue(x.children)
  99. self.assertEqual(len(x.children), 10)
  100. x._cache = {'status': states.SUCCESS}
  101. x.backend._cache[tid] = {'result': None}
  102. self.assertIsNone(x.children)
  103. def test_build_graph_get_leaf_collect(self):
  104. x = self.app.AsyncResult('1')
  105. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  106. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  107. x.iterdeps = Mock()
  108. x.iterdeps.return_value = (
  109. (None, x),
  110. (x, c[0]),
  111. (c[0], c[1]),
  112. (c[1], c[2])
  113. )
  114. x.backend.READY_STATES = states.READY_STATES
  115. self.assertTrue(x.graph)
  116. self.assertIs(x.get_leaf(), 2)
  117. it = x.collect()
  118. self.assertListEqual(list(it), [
  119. (x, None),
  120. (c[0], 0),
  121. (c[1], 1),
  122. (c[2], 2),
  123. ])
  124. def test_iterdeps(self):
  125. x = self.app.AsyncResult('1')
  126. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  127. x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
  128. for child in c:
  129. child.backend = Mock()
  130. child.backend.get_children.return_value = []
  131. it = x.iterdeps()
  132. self.assertListEqual(list(it), [
  133. (None, x),
  134. (x, c[0]),
  135. (x, c[1]),
  136. (x, c[2]),
  137. ])
  138. x._cache = None
  139. x.ready = Mock()
  140. x.ready.return_value = False
  141. with self.assertRaises(IncompleteStream):
  142. list(x.iterdeps())
  143. list(x.iterdeps(intermediate=True))
  144. def test_eq_not_implemented(self):
  145. self.assertNotEqual(self.app.AsyncResult('1'), object())
  146. @depends_on_current_app
  147. def test_reduce(self):
  148. a1 = self.app.AsyncResult('uuid')
  149. restored = pickle.loads(pickle.dumps(a1))
  150. self.assertEqual(restored.id, 'uuid')
  151. a2 = self.app.AsyncResult('uuid')
  152. self.assertEqual(pickle.loads(pickle.dumps(a2)).id, 'uuid')
  153. def test_maybe_set_cache_empty(self):
  154. self.app.AsyncResult('uuid')._maybe_set_cache(None)
  155. def test_set_cache__children(self):
  156. r1 = self.app.AsyncResult('id1')
  157. r2 = self.app.AsyncResult('id2')
  158. r1._set_cache({'children': [r2.as_tuple()]})
  159. self.assertIn(r2, r1.children)
  160. def test_successful(self):
  161. ok_res = self.app.AsyncResult(self.task1['id'])
  162. nok_res = self.app.AsyncResult(self.task3['id'])
  163. nok_res2 = self.app.AsyncResult(self.task4['id'])
  164. self.assertTrue(ok_res.successful())
  165. self.assertFalse(nok_res.successful())
  166. self.assertFalse(nok_res2.successful())
  167. pending_res = self.app.AsyncResult(uuid())
  168. self.assertFalse(pending_res.successful())
  169. def test_str(self):
  170. ok_res = self.app.AsyncResult(self.task1['id'])
  171. ok2_res = self.app.AsyncResult(self.task2['id'])
  172. nok_res = self.app.AsyncResult(self.task3['id'])
  173. self.assertEqual(str(ok_res), self.task1['id'])
  174. self.assertEqual(str(ok2_res), self.task2['id'])
  175. self.assertEqual(str(nok_res), self.task3['id'])
  176. pending_id = uuid()
  177. pending_res = self.app.AsyncResult(pending_id)
  178. self.assertEqual(str(pending_res), pending_id)
  179. def test_repr(self):
  180. ok_res = self.app.AsyncResult(self.task1['id'])
  181. ok2_res = self.app.AsyncResult(self.task2['id'])
  182. nok_res = self.app.AsyncResult(self.task3['id'])
  183. self.assertEqual(repr(ok_res), '<AsyncResult: %s>' % (
  184. self.task1['id']))
  185. self.assertEqual(repr(ok2_res), '<AsyncResult: %s>' % (
  186. self.task2['id']))
  187. self.assertEqual(repr(nok_res), '<AsyncResult: %s>' % (
  188. self.task3['id']))
  189. pending_id = uuid()
  190. pending_res = self.app.AsyncResult(pending_id)
  191. self.assertEqual(repr(pending_res), '<AsyncResult: %s>' % (
  192. pending_id))
  193. def test_hash(self):
  194. self.assertEqual(hash(self.app.AsyncResult('x0w991')),
  195. hash(self.app.AsyncResult('x0w991')))
  196. self.assertNotEqual(hash(self.app.AsyncResult('x0w991')),
  197. hash(self.app.AsyncResult('x1w991')))
  198. def test_get_traceback(self):
  199. ok_res = self.app.AsyncResult(self.task1['id'])
  200. nok_res = self.app.AsyncResult(self.task3['id'])
  201. nok_res2 = self.app.AsyncResult(self.task4['id'])
  202. self.assertFalse(ok_res.traceback)
  203. self.assertTrue(nok_res.traceback)
  204. self.assertTrue(nok_res2.traceback)
  205. pending_res = self.app.AsyncResult(uuid())
  206. self.assertFalse(pending_res.traceback)
  207. def test_get__backend_gives_None(self):
  208. res = self.app.AsyncResult(self.task1['id'])
  209. res.backend.wait_for = Mock(name='wait_for')
  210. res.backend.wait_for.return_value = None
  211. self.assertIsNone(res.get())
  212. def test_get(self):
  213. ok_res = self.app.AsyncResult(self.task1['id'])
  214. ok2_res = self.app.AsyncResult(self.task2['id'])
  215. nok_res = self.app.AsyncResult(self.task3['id'])
  216. nok2_res = self.app.AsyncResult(self.task4['id'])
  217. callback = Mock(name='callback')
  218. self.assertEqual(ok_res.get(callback=callback), 'the')
  219. callback.assert_called_with(ok_res.id, 'the')
  220. self.assertEqual(ok2_res.get(), 'quick')
  221. with self.assertRaises(KeyError):
  222. nok_res.get()
  223. self.assertTrue(nok_res.get(propagate=False))
  224. self.assertIsInstance(nok2_res.result, KeyError)
  225. self.assertEqual(ok_res.info, 'the')
  226. def test_eq_ne(self):
  227. r1 = self.app.AsyncResult(self.task1['id'])
  228. r2 = self.app.AsyncResult(self.task1['id'])
  229. r3 = self.app.AsyncResult(self.task2['id'])
  230. self.assertEqual(r1, r2)
  231. self.assertNotEqual(r1, r3)
  232. self.assertEqual(r1, r2.id)
  233. self.assertNotEqual(r1, r3.id)
  234. @depends_on_current_app
  235. def test_reduce_restore(self):
  236. r1 = self.app.AsyncResult(self.task1['id'])
  237. fun, args = r1.__reduce__()
  238. self.assertEqual(fun(*args), r1)
  239. def test_get_timeout(self):
  240. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  241. with self.assertRaises(TimeoutError):
  242. res.get(timeout=0.001)
  243. pending_res = self.app.AsyncResult(uuid())
  244. with patch('celery.result.time') as _time:
  245. with self.assertRaises(TimeoutError):
  246. pending_res.get(timeout=0.001, interval=0.001)
  247. _time.sleep.assert_called_with(0.001)
  248. def test_get_timeout_longer(self):
  249. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  250. with patch('celery.result.time') as _time:
  251. with self.assertRaises(TimeoutError):
  252. res.get(timeout=1, interval=1)
  253. _time.sleep.assert_called_with(1)
  254. def test_ready(self):
  255. oks = (self.app.AsyncResult(self.task1['id']),
  256. self.app.AsyncResult(self.task2['id']),
  257. self.app.AsyncResult(self.task3['id']))
  258. self.assertTrue(all(result.ready() for result in oks))
  259. self.assertFalse(self.app.AsyncResult(self.task4['id']).ready())
  260. self.assertFalse(self.app.AsyncResult(uuid()).ready())
  261. class test_ResultSet(AppCase):
  262. def test_resultset_repr(self):
  263. self.assertTrue(repr(self.app.ResultSet(
  264. [self.app.AsyncResult(t) for t in ['1', '2', '3']])))
  265. def test_eq_other(self):
  266. self.assertNotEqual(
  267. self.app.ResultSet([self.app.AsyncResult(t)
  268. for t in [1, 3, 3]]),
  269. 1,
  270. )
  271. rs1 = self.app.ResultSet([self.app.AsyncResult(1)])
  272. rs2 = self.app.ResultSet([self.app.AsyncResult(1)])
  273. self.assertEqual(rs1, rs2)
  274. def test_get(self):
  275. x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  276. b = x.results[0].backend = Mock()
  277. b.supports_native_join = False
  278. x.join_native = Mock()
  279. x.join = Mock()
  280. x.get()
  281. self.assertTrue(x.join.called)
  282. b.supports_native_join = True
  283. x.get()
  284. self.assertTrue(x.join_native.called)
  285. def test_eq_ne(self):
  286. g1 = self.app.ResultSet([
  287. self.app.AsyncResult('id1'),
  288. self.app.AsyncResult('id2'),
  289. ])
  290. g2 = self.app.ResultSet([
  291. self.app.AsyncResult('id1'),
  292. self.app.AsyncResult('id2'),
  293. ])
  294. g3 = self.app.ResultSet([
  295. self.app.AsyncResult('id3'),
  296. self.app.AsyncResult('id1'),
  297. ])
  298. self.assertEqual(g1, g2)
  299. self.assertNotEqual(g1, g3)
  300. self.assertNotEqual(g1, object())
  301. def test_takes_app_from_first_task(self):
  302. x = ResultSet([self.app.AsyncResult('id1')])
  303. self.assertIs(x.app, x.results[0].app)
  304. x.app = self.app
  305. self.assertIs(x.app, self.app)
  306. def test_get_empty(self):
  307. x = self.app.ResultSet([])
  308. self.assertIsNone(x.supports_native_join)
  309. x.join = Mock(name='join')
  310. x.get()
  311. self.assertTrue(x.join.called)
  312. def test_add(self):
  313. x = self.app.ResultSet([self.app.AsyncResult(1)])
  314. x.add(self.app.AsyncResult(2))
  315. self.assertEqual(len(x), 2)
  316. x.add(self.app.AsyncResult(2))
  317. self.assertEqual(len(x), 2)
  318. @contextmanager
  319. def dummy_copy(self):
  320. with patch('celery.result.copy') as copy:
  321. def passt(arg):
  322. return arg
  323. copy.side_effect = passt
  324. yield
  325. def test_iterate_respects_subpolling_interval(self):
  326. r1 = self.app.AsyncResult(uuid())
  327. r2 = self.app.AsyncResult(uuid())
  328. backend = r1.backend = r2.backend = Mock()
  329. backend.subpolling_interval = 10
  330. ready = r1.ready = r2.ready = Mock()
  331. def se(*args, **kwargs):
  332. ready.side_effect = KeyError()
  333. return False
  334. ready.return_value = False
  335. ready.side_effect = se
  336. x = self.app.ResultSet([r1, r2])
  337. with self.dummy_copy():
  338. with patch('celery.result.time') as _time:
  339. with self.assertPendingDeprecation():
  340. with self.assertRaises(KeyError):
  341. list(x.iterate())
  342. _time.sleep.assert_called_with(10)
  343. backend.subpolling_interval = 0
  344. with patch('celery.result.time') as _time:
  345. with self.assertPendingDeprecation():
  346. with self.assertRaises(KeyError):
  347. ready.return_value = False
  348. ready.side_effect = se
  349. list(x.iterate())
  350. self.assertFalse(_time.sleep.called)
  351. def test_times_out(self):
  352. r1 = self.app.AsyncResult(uuid)
  353. r1.ready = Mock()
  354. r1.ready.return_value = False
  355. x = self.app.ResultSet([r1])
  356. with self.dummy_copy():
  357. with patch('celery.result.time'):
  358. with self.assertPendingDeprecation():
  359. with self.assertRaises(TimeoutError):
  360. list(x.iterate(timeout=1))
  361. def test_add_discard(self):
  362. x = self.app.ResultSet([])
  363. x.add(self.app.AsyncResult('1'))
  364. self.assertIn(self.app.AsyncResult('1'), x.results)
  365. x.discard(self.app.AsyncResult('1'))
  366. x.discard(self.app.AsyncResult('1'))
  367. x.discard('1')
  368. self.assertNotIn(self.app.AsyncResult('1'), x.results)
  369. x.update([self.app.AsyncResult('2')])
  370. def test_clear(self):
  371. x = self.app.ResultSet([])
  372. r = x.results
  373. x.clear()
  374. self.assertIs(x.results, r)
  375. class MockAsyncResultFailure(AsyncResult):
  376. @property
  377. def result(self):
  378. return KeyError('baz')
  379. @property
  380. def state(self):
  381. return states.FAILURE
  382. def get(self, propagate=True, **kwargs):
  383. if propagate:
  384. raise self.result
  385. return self.result
  386. class MockAsyncResultSuccess(AsyncResult):
  387. forgotten = False
  388. def forget(self):
  389. self.forgotten = True
  390. @property
  391. def result(self):
  392. return 42
  393. @property
  394. def state(self):
  395. return states.SUCCESS
  396. def get(self, **kwargs):
  397. return self.result
  398. class SimpleBackend(SyncBackendMixin):
  399. ids = []
  400. def __init__(self, ids=[]):
  401. self.ids = ids
  402. def _ensure_not_eager(self):
  403. pass
  404. def get_many(self, *args, **kwargs):
  405. return ((id, {'result': i, 'status': states.SUCCESS})
  406. for i, id in enumerate(self.ids))
  407. class test_GroupResult(AppCase):
  408. def setup(self):
  409. self.size = 10
  410. self.ts = self.app.GroupResult(
  411. uuid(), make_mock_group(self.app, self.size),
  412. )
  413. @depends_on_current_app
  414. def test_is_pickleable(self):
  415. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  416. self.assertEqual(pickle.loads(pickle.dumps(ts)), ts)
  417. ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  418. self.assertEqual(pickle.loads(pickle.dumps(ts2)), ts2)
  419. @depends_on_current_app
  420. def test_reduce(self):
  421. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  422. fun, args = ts.__reduce__()
  423. ts2 = fun(*args)
  424. self.assertEqual(ts2.id, ts.id)
  425. self.assertEqual(ts, ts2)
  426. def test_eq_ne(self):
  427. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  428. ts2 = self.app.GroupResult(ts.id, ts.results)
  429. ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  430. ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())])
  431. self.assertEqual(ts, ts2)
  432. self.assertNotEqual(ts, ts3)
  433. self.assertNotEqual(ts, ts4)
  434. self.assertNotEqual(ts, object())
  435. def test_len(self):
  436. self.assertEqual(len(self.ts), self.size)
  437. def test_eq_other(self):
  438. self.assertNotEqual(self.ts, 1)
  439. @depends_on_current_app
  440. def test_pickleable(self):
  441. self.assertTrue(pickle.loads(pickle.dumps(self.ts)))
  442. def test_iterate_raises(self):
  443. ar = MockAsyncResultFailure(uuid(), app=self.app)
  444. ts = self.app.GroupResult(uuid(), [ar])
  445. with self.assertPendingDeprecation():
  446. it = ts.iterate()
  447. with self.assertRaises(KeyError):
  448. next(it)
  449. def test_forget(self):
  450. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  451. MockAsyncResultSuccess(uuid(), app=self.app)]
  452. ts = self.app.GroupResult(uuid(), subs)
  453. ts.forget()
  454. for sub in subs:
  455. self.assertTrue(sub.forgotten)
  456. def test_getitem(self):
  457. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  458. MockAsyncResultSuccess(uuid(), app=self.app)]
  459. ts = self.app.GroupResult(uuid(), subs)
  460. self.assertIs(ts[0], subs[0])
  461. def test_save_restore(self):
  462. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  463. MockAsyncResultSuccess(uuid(), app=self.app)]
  464. ts = self.app.GroupResult(uuid(), subs)
  465. ts.save()
  466. with self.assertRaises(AttributeError):
  467. ts.save(backend=object())
  468. self.assertEqual(self.app.GroupResult.restore(ts.id).results,
  469. ts.results)
  470. ts.delete()
  471. self.assertIsNone(self.app.GroupResult.restore(ts.id))
  472. with self.assertRaises(AttributeError):
  473. self.app.GroupResult.restore(ts.id, backend=object())
  474. def test_join_native(self):
  475. backend = SimpleBackend()
  476. results = [self.app.AsyncResult(uuid(), backend=backend)
  477. for i in range(10)]
  478. ts = self.app.GroupResult(uuid(), results)
  479. ts.app.backend = backend
  480. backend.ids = [result.id for result in results]
  481. res = ts.join_native()
  482. self.assertEqual(res, list(range(10)))
  483. callback = Mock(name='callback')
  484. self.assertFalse(ts.join_native(callback=callback))
  485. callback.assert_has_calls([
  486. call(r.id, i) for i, r in enumerate(ts.results)
  487. ])
  488. def test_join_native_raises(self):
  489. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  490. ts.iter_native = Mock()
  491. ts.iter_native.return_value = iter([
  492. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  493. ])
  494. with self.assertRaises(KeyError):
  495. ts.join_native(propagate=True)
  496. def test_failed_join_report(self):
  497. res = Mock()
  498. ts = self.app.GroupResult(uuid(), [res])
  499. res.state = states.FAILURE
  500. res.backend.is_cached.return_value = True
  501. self.assertIs(next(ts._failed_join_report()), res)
  502. res.backend.is_cached.return_value = False
  503. with self.assertRaises(StopIteration):
  504. next(ts._failed_join_report())
  505. def test_repr(self):
  506. self.assertTrue(repr(
  507. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  508. ))
  509. def test_children_is_results(self):
  510. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  511. self.assertIs(ts.children, ts.results)
  512. def test_iter_native(self):
  513. backend = SimpleBackend()
  514. results = [self.app.AsyncResult(uuid(), backend=backend)
  515. for i in range(10)]
  516. ts = self.app.GroupResult(uuid(), results)
  517. ts.app.backend = backend
  518. backend.ids = [result.id for result in results]
  519. self.assertEqual(len(list(ts.iter_native())), 10)
  520. def test_iterate_yields(self):
  521. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  522. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  523. ts = self.app.GroupResult(uuid(), [ar, ar2])
  524. with self.assertPendingDeprecation():
  525. it = ts.iterate()
  526. self.assertEqual(next(it), 42)
  527. self.assertEqual(next(it), 42)
  528. def test_iterate_eager(self):
  529. ar1 = EagerResult(uuid(), 42, states.SUCCESS)
  530. ar2 = EagerResult(uuid(), 42, states.SUCCESS)
  531. ts = self.app.GroupResult(uuid(), [ar1, ar2])
  532. with self.assertPendingDeprecation():
  533. it = ts.iterate()
  534. self.assertEqual(next(it), 42)
  535. self.assertEqual(next(it), 42)
  536. def test_join_timeout(self):
  537. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  538. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  539. ar3 = self.app.AsyncResult(uuid())
  540. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  541. with self.assertRaises(TimeoutError):
  542. ts.join(timeout=0.0000001)
  543. ar4 = self.app.AsyncResult(uuid())
  544. ar4.get = Mock()
  545. ts2 = self.app.GroupResult(uuid(), [ar4])
  546. self.assertTrue(ts2.join(timeout=0.1))
  547. callback = Mock(name='callback')
  548. self.assertFalse(ts2.join(timeout=0.1, callback=callback))
  549. callback.assert_called_with(ar4.id, ar4.get())
  550. def test_iter_native_when_empty_group(self):
  551. ts = self.app.GroupResult(uuid(), [])
  552. self.assertListEqual(list(ts.iter_native()), [])
  553. def test_iterate_simple(self):
  554. with self.assertPendingDeprecation():
  555. it = self.ts.iterate()
  556. results = sorted(list(it))
  557. self.assertListEqual(results, list(range(self.size)))
  558. def test___iter__(self):
  559. self.assertListEqual(list(iter(self.ts)), self.ts.results)
  560. def test_join(self):
  561. joined = self.ts.join()
  562. self.assertListEqual(joined, list(range(self.size)))
  563. def test_successful(self):
  564. self.assertTrue(self.ts.successful())
  565. def test_failed(self):
  566. self.assertFalse(self.ts.failed())
  567. def test_maybe_throw(self):
  568. self.ts.results = [Mock(name='r1')]
  569. self.ts.maybe_throw()
  570. self.ts.results[0].maybe_throw.assert_called_with(
  571. callback=None, propagate=True,
  572. )
  573. def test_join__on_message(self):
  574. with self.assertRaises(ImproperlyConfigured):
  575. self.ts.join(on_message=Mock())
  576. def test_waiting(self):
  577. self.assertFalse(self.ts.waiting())
  578. def test_ready(self):
  579. self.assertTrue(self.ts.ready())
  580. def test_completed_count(self):
  581. self.assertEqual(self.ts.completed_count(), len(self.ts))
  582. class test_pending_AsyncResult(AppCase):
  583. def setup(self):
  584. self.task = self.app.AsyncResult(uuid())
  585. def test_result(self):
  586. self.assertIsNone(self.task.result)
  587. class test_failed_AsyncResult(test_GroupResult):
  588. def setup(self):
  589. self.app.conf.result_serializer = 'pickle'
  590. self.size = 11
  591. results = make_mock_group(self.app, 10)
  592. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  593. save_result(self.app, failed)
  594. failed_res = self.app.AsyncResult(failed['id'])
  595. self.ts = self.app.GroupResult(uuid(), results + [failed_res])
  596. def test_completed_count(self):
  597. self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
  598. def test_iterate_simple(self):
  599. with self.assertPendingDeprecation():
  600. it = self.ts.iterate()
  601. def consume():
  602. return list(it)
  603. with self.assertRaises(KeyError):
  604. consume()
  605. def test_join(self):
  606. with self.assertRaises(KeyError):
  607. self.ts.join()
  608. def test_successful(self):
  609. self.assertFalse(self.ts.successful())
  610. def test_failed(self):
  611. self.assertTrue(self.ts.failed())
  612. class test_pending_Group(AppCase):
  613. def setup(self):
  614. self.ts = self.app.GroupResult(
  615. uuid(), [self.app.AsyncResult(uuid()),
  616. self.app.AsyncResult(uuid())])
  617. def test_completed_count(self):
  618. self.assertEqual(self.ts.completed_count(), 0)
  619. def test_ready(self):
  620. self.assertFalse(self.ts.ready())
  621. def test_waiting(self):
  622. self.assertTrue(self.ts.waiting())
  623. def x_join(self):
  624. with self.assertRaises(TimeoutError):
  625. self.ts.join(timeout=0.001)
  626. def x_join_longer(self):
  627. with self.assertRaises(TimeoutError):
  628. self.ts.join(timeout=1)
  629. class test_EagerResult(AppCase):
  630. def setup(self):
  631. @self.app.task(shared=False)
  632. def raising(x, y):
  633. raise KeyError(x, y)
  634. self.raising = raising
  635. def test_wait_raises(self):
  636. res = self.raising.apply(args=[3, 3])
  637. with self.assertRaises(KeyError):
  638. res.wait()
  639. self.assertTrue(res.wait(propagate=False))
  640. def test_wait(self):
  641. res = EagerResult('x', 'x', states.RETRY)
  642. res.wait()
  643. self.assertEqual(res.state, states.RETRY)
  644. self.assertEqual(res.status, states.RETRY)
  645. def test_forget(self):
  646. res = EagerResult('x', 'x', states.RETRY)
  647. res.forget()
  648. def test_revoke(self):
  649. res = self.raising.apply(args=[3, 3])
  650. self.assertFalse(res.revoke())
  651. class test_tuples(AppCase):
  652. def test_AsyncResult(self):
  653. x = self.app.AsyncResult(uuid())
  654. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  655. self.assertEqual(x, result_from_tuple(x, self.app))
  656. def test_with_parent(self):
  657. x = self.app.AsyncResult(uuid())
  658. x.parent = self.app.AsyncResult(uuid())
  659. y = result_from_tuple(x.as_tuple(), self.app)
  660. self.assertEqual(y, x)
  661. self.assertEqual(y.parent, x.parent)
  662. self.assertIsInstance(y.parent, AsyncResult)
  663. def test_compat(self):
  664. uid = uuid()
  665. x = result_from_tuple([uid, []], app=self.app)
  666. self.assertEqual(x.id, uid)
  667. def test_GroupResult(self):
  668. x = self.app.GroupResult(
  669. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  670. )
  671. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  672. self.assertEqual(x, result_from_tuple(x, self.app))