test_result.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. from __future__ import absolute_import
  2. from contextlib import contextmanager
  3. from celery import states
  4. from celery.exceptions import IncompleteStream, TimeoutError
  5. from celery.five import range
  6. from celery.result import (
  7. AsyncResult,
  8. EagerResult,
  9. TaskSetResult,
  10. result_from_tuple,
  11. )
  12. from celery.utils import uuid
  13. from celery.utils.serialization import pickle
  14. from celery.tests.case import AppCase, Mock, depends_on_current_app, patch
  15. def mock_task(name, state, result):
  16. return dict(id=uuid(), name=name, state=state, result=result)
  17. def save_result(app, task):
  18. traceback = 'Some traceback'
  19. if task['state'] == states.SUCCESS:
  20. app.backend.mark_as_done(task['id'], task['result'])
  21. elif task['state'] == states.RETRY:
  22. app.backend.mark_as_retry(
  23. task['id'], task['result'], traceback=traceback,
  24. )
  25. else:
  26. app.backend.mark_as_failure(
  27. task['id'], task['result'], traceback=traceback,
  28. )
  29. def make_mock_group(app, size=10):
  30. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  31. [save_result(app, task) for task in tasks]
  32. return [app.AsyncResult(task['id']) for task in tasks]
  33. class test_AsyncResult(AppCase):
  34. def setup(self):
  35. self.app.conf.CELERY_RESULT_SERIALIZER = 'pickle'
  36. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  37. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  38. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  39. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  40. for task in (self.task1, self.task2, self.task3, self.task4):
  41. save_result(self.app, task)
  42. @self.app.task(shared=False)
  43. def mytask():
  44. pass
  45. self.mytask = mytask
  46. def test_compat_properties(self):
  47. x = self.app.AsyncResult('1')
  48. self.assertEqual(x.task_id, x.id)
  49. x.task_id = '2'
  50. self.assertEqual(x.id, '2')
  51. def test_children(self):
  52. x = self.app.AsyncResult('1')
  53. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  54. x._cache = {'children': children, 'status': states.SUCCESS}
  55. x.backend = Mock()
  56. self.assertTrue(x.children)
  57. self.assertEqual(len(x.children), 3)
  58. def test_propagates_for_parent(self):
  59. x = self.app.AsyncResult(uuid())
  60. x.backend = Mock(name='backend')
  61. x.backend.get_task_meta.return_value = {}
  62. x.backend.wait_for.return_value = {
  63. 'status': states.SUCCESS, 'result': 84,
  64. }
  65. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  66. with self.assertRaises(KeyError):
  67. x.get(propagate=True)
  68. self.assertFalse(x.backend.wait_for.called)
  69. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  70. self.assertEqual(x.get(propagate=True), 84)
  71. self.assertTrue(x.backend.wait_for.called)
  72. def test_get_children(self):
  73. tid = uuid()
  74. x = self.app.AsyncResult(tid)
  75. child = [self.app.AsyncResult(uuid()).as_tuple()
  76. for i in range(10)]
  77. x._cache = {'children': child}
  78. self.assertTrue(x.children)
  79. self.assertEqual(len(x.children), 10)
  80. x._cache = {'status': states.SUCCESS}
  81. x.backend._cache[tid] = {'result': None}
  82. self.assertIsNone(x.children)
  83. def test_build_graph_get_leaf_collect(self):
  84. x = self.app.AsyncResult('1')
  85. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  86. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  87. x.iterdeps = Mock()
  88. x.iterdeps.return_value = (
  89. (None, x),
  90. (x, c[0]),
  91. (c[0], c[1]),
  92. (c[1], c[2])
  93. )
  94. x.backend.READY_STATES = states.READY_STATES
  95. self.assertTrue(x.graph)
  96. self.assertIs(x.get_leaf(), 2)
  97. it = x.collect()
  98. self.assertListEqual(list(it), [
  99. (x, None),
  100. (c[0], 0),
  101. (c[1], 1),
  102. (c[2], 2),
  103. ])
  104. def test_iterdeps(self):
  105. x = self.app.AsyncResult('1')
  106. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  107. x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
  108. for child in c:
  109. child.backend = Mock()
  110. child.backend.get_children.return_value = []
  111. it = x.iterdeps()
  112. self.assertListEqual(list(it), [
  113. (None, x),
  114. (x, c[0]),
  115. (x, c[1]),
  116. (x, c[2]),
  117. ])
  118. x._cache = None
  119. x.ready = Mock()
  120. x.ready.return_value = False
  121. with self.assertRaises(IncompleteStream):
  122. list(x.iterdeps())
  123. list(x.iterdeps(intermediate=True))
  124. def test_eq_not_implemented(self):
  125. self.assertFalse(self.app.AsyncResult('1') == object())
  126. @depends_on_current_app
  127. def test_reduce(self):
  128. a1 = self.app.AsyncResult('uuid', task_name=self.mytask.name)
  129. restored = pickle.loads(pickle.dumps(a1))
  130. self.assertEqual(restored.id, 'uuid')
  131. self.assertEqual(restored.task_name, self.mytask.name)
  132. a2 = self.app.AsyncResult('uuid')
  133. self.assertEqual(pickle.loads(pickle.dumps(a2)).id, 'uuid')
  134. def test_successful(self):
  135. ok_res = self.app.AsyncResult(self.task1['id'])
  136. nok_res = self.app.AsyncResult(self.task3['id'])
  137. nok_res2 = self.app.AsyncResult(self.task4['id'])
  138. self.assertTrue(ok_res.successful())
  139. self.assertFalse(nok_res.successful())
  140. self.assertFalse(nok_res2.successful())
  141. pending_res = self.app.AsyncResult(uuid())
  142. self.assertFalse(pending_res.successful())
  143. def test_str(self):
  144. ok_res = self.app.AsyncResult(self.task1['id'])
  145. ok2_res = self.app.AsyncResult(self.task2['id'])
  146. nok_res = self.app.AsyncResult(self.task3['id'])
  147. self.assertEqual(str(ok_res), self.task1['id'])
  148. self.assertEqual(str(ok2_res), self.task2['id'])
  149. self.assertEqual(str(nok_res), self.task3['id'])
  150. pending_id = uuid()
  151. pending_res = self.app.AsyncResult(pending_id)
  152. self.assertEqual(str(pending_res), pending_id)
  153. def test_repr(self):
  154. ok_res = self.app.AsyncResult(self.task1['id'])
  155. ok2_res = self.app.AsyncResult(self.task2['id'])
  156. nok_res = self.app.AsyncResult(self.task3['id'])
  157. self.assertEqual(repr(ok_res), '<AsyncResult: %s>' % (
  158. self.task1['id']))
  159. self.assertEqual(repr(ok2_res), '<AsyncResult: %s>' % (
  160. self.task2['id']))
  161. self.assertEqual(repr(nok_res), '<AsyncResult: %s>' % (
  162. self.task3['id']))
  163. pending_id = uuid()
  164. pending_res = self.app.AsyncResult(pending_id)
  165. self.assertEqual(repr(pending_res), '<AsyncResult: %s>' % (
  166. pending_id))
  167. def test_hash(self):
  168. self.assertEqual(hash(self.app.AsyncResult('x0w991')),
  169. hash(self.app.AsyncResult('x0w991')))
  170. self.assertNotEqual(hash(self.app.AsyncResult('x0w991')),
  171. hash(self.app.AsyncResult('x1w991')))
  172. def test_get_traceback(self):
  173. ok_res = self.app.AsyncResult(self.task1['id'])
  174. nok_res = self.app.AsyncResult(self.task3['id'])
  175. nok_res2 = self.app.AsyncResult(self.task4['id'])
  176. self.assertFalse(ok_res.traceback)
  177. self.assertTrue(nok_res.traceback)
  178. self.assertTrue(nok_res2.traceback)
  179. pending_res = self.app.AsyncResult(uuid())
  180. self.assertFalse(pending_res.traceback)
  181. def test_get(self):
  182. ok_res = self.app.AsyncResult(self.task1['id'])
  183. ok2_res = self.app.AsyncResult(self.task2['id'])
  184. nok_res = self.app.AsyncResult(self.task3['id'])
  185. nok2_res = self.app.AsyncResult(self.task4['id'])
  186. self.assertEqual(ok_res.get(), 'the')
  187. self.assertEqual(ok2_res.get(), 'quick')
  188. with self.assertRaises(KeyError):
  189. nok_res.get()
  190. self.assertTrue(nok_res.get(propagate=False))
  191. self.assertIsInstance(nok2_res.result, KeyError)
  192. self.assertEqual(ok_res.info, 'the')
  193. def test_get_timeout(self):
  194. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  195. with self.assertRaises(TimeoutError):
  196. res.get(timeout=0.001)
  197. pending_res = self.app.AsyncResult(uuid())
  198. with patch('celery.result.time') as _time:
  199. with self.assertRaises(TimeoutError):
  200. pending_res.get(timeout=0.001, interval=0.001)
  201. _time.sleep.assert_called_with(0.001)
  202. def test_get_timeout_longer(self):
  203. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  204. with patch('celery.result.time') as _time:
  205. with self.assertRaises(TimeoutError):
  206. res.get(timeout=1, interval=1)
  207. _time.sleep.assert_called_with(1)
  208. def test_ready(self):
  209. oks = (self.app.AsyncResult(self.task1['id']),
  210. self.app.AsyncResult(self.task2['id']),
  211. self.app.AsyncResult(self.task3['id']))
  212. self.assertTrue(all(result.ready() for result in oks))
  213. self.assertFalse(self.app.AsyncResult(self.task4['id']).ready())
  214. self.assertFalse(self.app.AsyncResult(uuid()).ready())
  215. class test_ResultSet(AppCase):
  216. def test_resultset_repr(self):
  217. self.assertTrue(repr(self.app.ResultSet(
  218. [self.app.AsyncResult(t) for t in ['1', '2', '3']])))
  219. def test_eq_other(self):
  220. self.assertFalse(self.app.ResultSet([1, 3, 3]) == 1)
  221. self.assertTrue(self.app.ResultSet([1]) == self.app.ResultSet([1]))
  222. def test_get(self):
  223. x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  224. b = x.results[0].backend = Mock()
  225. b.supports_native_join = False
  226. x.join_native = Mock()
  227. x.join = Mock()
  228. x.get()
  229. self.assertTrue(x.join.called)
  230. b.supports_native_join = True
  231. x.get()
  232. self.assertTrue(x.join_native.called)
  233. def test_get_empty(self):
  234. x = self.app.ResultSet([])
  235. self.assertIsNone(x.supports_native_join)
  236. x.join = Mock(name='join')
  237. x.get()
  238. self.assertTrue(x.join.called)
  239. def test_add(self):
  240. x = self.app.ResultSet([1])
  241. x.add(2)
  242. self.assertEqual(len(x), 2)
  243. x.add(2)
  244. self.assertEqual(len(x), 2)
  245. @contextmanager
  246. def dummy_copy(self):
  247. with patch('celery.result.copy') as copy:
  248. def passt(arg):
  249. return arg
  250. copy.side_effect = passt
  251. yield
  252. def test_iterate_respects_subpolling_interval(self):
  253. r1 = self.app.AsyncResult(uuid())
  254. r2 = self.app.AsyncResult(uuid())
  255. backend = r1.backend = r2.backend = Mock()
  256. backend.subpolling_interval = 10
  257. ready = r1.ready = r2.ready = Mock()
  258. def se(*args, **kwargs):
  259. ready.side_effect = KeyError()
  260. return False
  261. ready.return_value = False
  262. ready.side_effect = se
  263. x = self.app.ResultSet([r1, r2])
  264. with self.dummy_copy():
  265. with patch('celery.result.time') as _time:
  266. with self.assertPendingDeprecation():
  267. with self.assertRaises(KeyError):
  268. list(x.iterate())
  269. _time.sleep.assert_called_with(10)
  270. backend.subpolling_interval = 0
  271. with patch('celery.result.time') as _time:
  272. with self.assertPendingDeprecation():
  273. with self.assertRaises(KeyError):
  274. ready.return_value = False
  275. ready.side_effect = se
  276. list(x.iterate())
  277. self.assertFalse(_time.sleep.called)
  278. def test_times_out(self):
  279. r1 = self.app.AsyncResult(uuid)
  280. r1.ready = Mock()
  281. r1.ready.return_value = False
  282. x = self.app.ResultSet([r1])
  283. with self.dummy_copy():
  284. with patch('celery.result.time'):
  285. with self.assertPendingDeprecation():
  286. with self.assertRaises(TimeoutError):
  287. list(x.iterate(timeout=1))
  288. def test_add_discard(self):
  289. x = self.app.ResultSet([])
  290. x.add(self.app.AsyncResult('1'))
  291. self.assertIn(self.app.AsyncResult('1'), x.results)
  292. x.discard(self.app.AsyncResult('1'))
  293. x.discard(self.app.AsyncResult('1'))
  294. x.discard('1')
  295. self.assertNotIn(self.app.AsyncResult('1'), x.results)
  296. x.update([self.app.AsyncResult('2')])
  297. def test_clear(self):
  298. x = self.app.ResultSet([])
  299. r = x.results
  300. x.clear()
  301. self.assertIs(x.results, r)
  302. class MockAsyncResultFailure(AsyncResult):
  303. @property
  304. def result(self):
  305. return KeyError('baz')
  306. @property
  307. def state(self):
  308. return states.FAILURE
  309. def get(self, propagate=True, **kwargs):
  310. if propagate:
  311. raise self.result
  312. return self.result
  313. class MockAsyncResultSuccess(AsyncResult):
  314. forgotten = False
  315. def forget(self):
  316. self.forgotten = True
  317. @property
  318. def result(self):
  319. return 42
  320. @property
  321. def state(self):
  322. return states.SUCCESS
  323. def get(self, **kwargs):
  324. return self.result
  325. class SimpleBackend(object):
  326. ids = []
  327. def __init__(self, ids=[]):
  328. self.ids = ids
  329. def get_many(self, *args, **kwargs):
  330. return ((id, {'result': i, 'status': states.SUCCESS})
  331. for i, id in enumerate(self.ids))
  332. class test_TaskSetResult(AppCase):
  333. def setup(self):
  334. self.size = 10
  335. self.ts = TaskSetResult(uuid(), make_mock_group(self.app, self.size))
  336. def test_total(self):
  337. self.assertEqual(self.ts.total, self.size)
  338. def test_compat_properties(self):
  339. self.assertEqual(self.ts.taskset_id, self.ts.id)
  340. self.ts.taskset_id = 'foo'
  341. self.assertEqual(self.ts.taskset_id, 'foo')
  342. def test_compat_subtasks_kwarg(self):
  343. x = TaskSetResult(uuid(), subtasks=[1, 2, 3])
  344. self.assertEqual(x.results, [1, 2, 3])
  345. def test_itersubtasks(self):
  346. it = self.ts.itersubtasks()
  347. for i, t in enumerate(it):
  348. self.assertEqual(t.get(), i)
  349. class test_GroupResult(AppCase):
  350. def setup(self):
  351. self.size = 10
  352. self.ts = self.app.GroupResult(
  353. uuid(), make_mock_group(self.app, self.size),
  354. )
  355. @depends_on_current_app
  356. def test_is_pickleable(self):
  357. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  358. self.assertEqual(pickle.loads(pickle.dumps(ts)), ts)
  359. ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  360. self.assertEqual(pickle.loads(pickle.dumps(ts2)), ts2)
  361. def test_len(self):
  362. self.assertEqual(len(self.ts), self.size)
  363. def test_eq_other(self):
  364. self.assertFalse(self.ts == 1)
  365. @depends_on_current_app
  366. def test_reduce(self):
  367. self.assertTrue(pickle.loads(pickle.dumps(self.ts)))
  368. def test_iterate_raises(self):
  369. ar = MockAsyncResultFailure(uuid(), app=self.app)
  370. ts = self.app.GroupResult(uuid(), [ar])
  371. with self.assertPendingDeprecation():
  372. it = ts.iterate()
  373. with self.assertRaises(KeyError):
  374. next(it)
  375. def test_forget(self):
  376. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  377. MockAsyncResultSuccess(uuid(), app=self.app)]
  378. ts = self.app.GroupResult(uuid(), subs)
  379. ts.forget()
  380. for sub in subs:
  381. self.assertTrue(sub.forgotten)
  382. def test_getitem(self):
  383. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  384. MockAsyncResultSuccess(uuid(), app=self.app)]
  385. ts = self.app.GroupResult(uuid(), subs)
  386. self.assertIs(ts[0], subs[0])
  387. def test_save_restore(self):
  388. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  389. MockAsyncResultSuccess(uuid(), app=self.app)]
  390. ts = self.app.GroupResult(uuid(), subs)
  391. ts.save()
  392. with self.assertRaises(AttributeError):
  393. ts.save(backend=object())
  394. self.assertEqual(self.app.GroupResult.restore(ts.id).subtasks,
  395. ts.subtasks)
  396. ts.delete()
  397. self.assertIsNone(self.app.GroupResult.restore(ts.id))
  398. with self.assertRaises(AttributeError):
  399. self.app.GroupResult.restore(ts.id, backend=object())
  400. def test_join_native(self):
  401. backend = SimpleBackend()
  402. subtasks = [self.app.AsyncResult(uuid(), backend=backend)
  403. for i in range(10)]
  404. ts = self.app.GroupResult(uuid(), subtasks)
  405. ts.app.backend = backend
  406. backend.ids = [subtask.id for subtask in subtasks]
  407. res = ts.join_native()
  408. self.assertEqual(res, list(range(10)))
  409. def test_join_native_raises(self):
  410. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  411. ts.iter_native = Mock()
  412. ts.iter_native.return_value = iter([
  413. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  414. ])
  415. with self.assertRaises(KeyError):
  416. ts.join_native(propagate=True)
  417. def test_failed_join_report(self):
  418. res = Mock()
  419. ts = self.app.GroupResult(uuid(), [res])
  420. res.state = states.FAILURE
  421. res.backend.is_cached.return_value = True
  422. self.assertIs(next(ts._failed_join_report()), res)
  423. res.backend.is_cached.return_value = False
  424. with self.assertRaises(StopIteration):
  425. next(ts._failed_join_report())
  426. def test_repr(self):
  427. self.assertTrue(repr(
  428. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  429. ))
  430. def test_children_is_results(self):
  431. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  432. self.assertIs(ts.children, ts.results)
  433. def test_iter_native(self):
  434. backend = SimpleBackend()
  435. subtasks = [self.app.AsyncResult(uuid(), backend=backend)
  436. for i in range(10)]
  437. ts = self.app.GroupResult(uuid(), subtasks)
  438. ts.app.backend = backend
  439. backend.ids = [subtask.id for subtask in subtasks]
  440. self.assertEqual(len(list(ts.iter_native())), 10)
  441. def test_iterate_yields(self):
  442. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  443. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  444. ts = self.app.GroupResult(uuid(), [ar, ar2])
  445. with self.assertPendingDeprecation():
  446. it = ts.iterate()
  447. self.assertEqual(next(it), 42)
  448. self.assertEqual(next(it), 42)
  449. def test_iterate_eager(self):
  450. ar1 = EagerResult(uuid(), 42, states.SUCCESS)
  451. ar2 = EagerResult(uuid(), 42, states.SUCCESS)
  452. ts = self.app.GroupResult(uuid(), [ar1, ar2])
  453. with self.assertPendingDeprecation():
  454. it = ts.iterate()
  455. self.assertEqual(next(it), 42)
  456. self.assertEqual(next(it), 42)
  457. def test_join_timeout(self):
  458. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  459. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  460. ar3 = self.app.AsyncResult(uuid())
  461. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  462. with self.assertRaises(TimeoutError):
  463. ts.join(timeout=0.0000001)
  464. ar4 = self.app.AsyncResult(uuid())
  465. ar4.get = Mock()
  466. ts2 = self.app.GroupResult(uuid(), [ar4])
  467. self.assertTrue(ts2.join(timeout=0.1))
  468. def test_iter_native_when_empty_group(self):
  469. ts = self.app.GroupResult(uuid(), [])
  470. self.assertListEqual(list(ts.iter_native()), [])
  471. def test_iterate_simple(self):
  472. with self.assertPendingDeprecation():
  473. it = self.ts.iterate()
  474. results = sorted(list(it))
  475. self.assertListEqual(results, list(range(self.size)))
  476. def test___iter__(self):
  477. self.assertListEqual(list(iter(self.ts)), self.ts.results)
  478. def test_join(self):
  479. joined = self.ts.join()
  480. self.assertListEqual(joined, list(range(self.size)))
  481. def test_successful(self):
  482. self.assertTrue(self.ts.successful())
  483. def test_failed(self):
  484. self.assertFalse(self.ts.failed())
  485. def test_waiting(self):
  486. self.assertFalse(self.ts.waiting())
  487. def test_ready(self):
  488. self.assertTrue(self.ts.ready())
  489. def test_completed_count(self):
  490. self.assertEqual(self.ts.completed_count(), len(self.ts))
  491. class test_pending_AsyncResult(AppCase):
  492. def setup(self):
  493. self.task = self.app.AsyncResult(uuid())
  494. def test_result(self):
  495. self.assertIsNone(self.task.result)
  496. class test_failed_AsyncResult(test_GroupResult):
  497. def setup(self):
  498. self.app.conf.CELERY_RESULT_SERIALIZER = 'pickle'
  499. self.size = 11
  500. subtasks = make_mock_group(self.app, 10)
  501. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  502. save_result(self.app, failed)
  503. failed_res = self.app.AsyncResult(failed['id'])
  504. self.ts = self.app.GroupResult(uuid(), subtasks + [failed_res])
  505. def test_completed_count(self):
  506. self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
  507. def test_iterate_simple(self):
  508. with self.assertPendingDeprecation():
  509. it = self.ts.iterate()
  510. def consume():
  511. return list(it)
  512. with self.assertRaises(KeyError):
  513. consume()
  514. def test_join(self):
  515. with self.assertRaises(KeyError):
  516. self.ts.join()
  517. def test_successful(self):
  518. self.assertFalse(self.ts.successful())
  519. def test_failed(self):
  520. self.assertTrue(self.ts.failed())
  521. class test_pending_Group(AppCase):
  522. def setup(self):
  523. self.ts = self.app.GroupResult(
  524. uuid(), [self.app.AsyncResult(uuid()),
  525. self.app.AsyncResult(uuid())])
  526. def test_completed_count(self):
  527. self.assertEqual(self.ts.completed_count(), 0)
  528. def test_ready(self):
  529. self.assertFalse(self.ts.ready())
  530. def test_waiting(self):
  531. self.assertTrue(self.ts.waiting())
  532. def x_join(self):
  533. with self.assertRaises(TimeoutError):
  534. self.ts.join(timeout=0.001)
  535. def x_join_longer(self):
  536. with self.assertRaises(TimeoutError):
  537. self.ts.join(timeout=1)
  538. class test_EagerResult(AppCase):
  539. def setup(self):
  540. @self.app.task(shared=False)
  541. def raising(x, y):
  542. raise KeyError(x, y)
  543. self.raising = raising
  544. def test_wait_raises(self):
  545. res = self.raising.apply(args=[3, 3])
  546. with self.assertRaises(KeyError):
  547. res.wait()
  548. self.assertTrue(res.wait(propagate=False))
  549. def test_wait(self):
  550. res = EagerResult('x', 'x', states.RETRY)
  551. res.wait()
  552. self.assertEqual(res.state, states.RETRY)
  553. self.assertEqual(res.status, states.RETRY)
  554. def test_forget(self):
  555. res = EagerResult('x', 'x', states.RETRY)
  556. res.forget()
  557. def test_revoke(self):
  558. res = self.raising.apply(args=[3, 3])
  559. self.assertFalse(res.revoke())
  560. class test_tuples(AppCase):
  561. def test_AsyncResult(self):
  562. x = self.app.AsyncResult(uuid())
  563. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  564. self.assertEqual(x, result_from_tuple(x, self.app))
  565. def test_with_parent(self):
  566. x = self.app.AsyncResult(uuid())
  567. x.parent = self.app.AsyncResult(uuid())
  568. y = result_from_tuple(x.as_tuple(), self.app)
  569. self.assertEqual(y, x)
  570. self.assertEqual(y.parent, x.parent)
  571. self.assertIsInstance(y.parent, AsyncResult)
  572. def test_compat(self):
  573. uid = uuid()
  574. x = result_from_tuple([uid, []], app=self.app)
  575. self.assertEqual(x.id, uid)
  576. def test_GroupResult(self):
  577. x = self.app.GroupResult(
  578. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  579. )
  580. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  581. self.assertEqual(x, result_from_tuple(x, self.app))