test_result.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. from __future__ import absolute_import
  2. from contextlib import contextmanager
  3. from mock import Mock, patch
  4. from celery import states
  5. from celery.exceptions import IncompleteStream, TimeoutError
  6. from celery.five import range
  7. from celery.result import (
  8. AsyncResult,
  9. EagerResult,
  10. TaskSetResult,
  11. ResultSet,
  12. GroupResult,
  13. from_serializable,
  14. )
  15. from celery.task import task
  16. from celery.task.base import Task
  17. from celery.utils import uuid
  18. from celery.utils.serialization import pickle
  19. from celery.tests.case import AppCase
  20. from celery.tests.case import skip_if_quick
  21. def mock_task(name, state, result):
  22. return dict(id=uuid(), name=name, state=state, result=result)
  23. def save_result(app, task):
  24. traceback = 'Some traceback'
  25. if task['state'] == states.SUCCESS:
  26. app.backend.mark_as_done(task['id'], task['result'])
  27. elif task['state'] == states.RETRY:
  28. app.backend.mark_as_retry(
  29. task['id'], task['result'], traceback=traceback,
  30. )
  31. else:
  32. app.backend.mark_as_failure(
  33. task['id'], task['result'], traceback=traceback,
  34. )
  35. def make_mock_group(app, size=10):
  36. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  37. [save_result(app, task) for task in tasks]
  38. return [app.AsyncResult(task['id']) for task in tasks]
  39. class test_AsyncResult(AppCase):
  40. def setup(self):
  41. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  42. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  43. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  44. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  45. for task in (self.task1, self.task2, self.task3, self.task4):
  46. save_result(self.app, task)
  47. @self.app.task()
  48. def mytask():
  49. pass
  50. self.mytask = mytask
  51. def test_compat_properties(self):
  52. x = self.app.AsyncResult('1')
  53. self.assertEqual(x.task_id, x.id)
  54. x.task_id = '2'
  55. self.assertEqual(x.id, '2')
  56. def test_children(self):
  57. x = self.app.AsyncResult('1')
  58. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  59. x.backend = Mock()
  60. x.backend.get_children.return_value = children
  61. x.backend.READY_STATES = states.READY_STATES
  62. self.assertTrue(x.children)
  63. self.assertEqual(len(x.children), 3)
  64. def test_propagates_for_parent(self):
  65. x = self.app.AsyncResult(uuid())
  66. x.backend = Mock()
  67. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  68. with self.assertRaises(KeyError):
  69. x.get(propagate=True)
  70. self.assertFalse(x.backend.wait_for.called)
  71. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  72. x.get(propagate=True)
  73. self.assertTrue(x.backend.wait_for.called)
  74. def test_get_children(self):
  75. tid = uuid()
  76. x = self.app.AsyncResult(tid)
  77. child = [self.app.AsyncResult(uuid()).serializable()
  78. for i in range(10)]
  79. x.backend._cache[tid] = {'children': child}
  80. self.assertTrue(x.children)
  81. self.assertEqual(len(x.children), 10)
  82. x.backend._cache[tid] = {'result': None}
  83. self.assertIsNone(x.children)
  84. def test_build_graph_get_leaf_collect(self):
  85. x = self.app.AsyncResult('1')
  86. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  87. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  88. x.iterdeps = Mock()
  89. x.iterdeps.return_value = (
  90. (None, x),
  91. (x, c[0]),
  92. (c[0], c[1]),
  93. (c[1], c[2])
  94. )
  95. x.backend.READY_STATES = states.READY_STATES
  96. self.assertTrue(x.graph)
  97. self.assertIs(x.get_leaf(), 2)
  98. it = x.collect()
  99. self.assertListEqual(list(it), [
  100. (x, None),
  101. (c[0], 0),
  102. (c[1], 1),
  103. (c[2], 2),
  104. ])
  105. def test_iterdeps(self):
  106. x = self.app.AsyncResult('1')
  107. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  108. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  109. for child in c:
  110. child.backend = Mock()
  111. child.backend.get_children.return_value = []
  112. x.backend.get_children = Mock()
  113. x.backend.get_children.return_value = c
  114. it = x.iterdeps()
  115. self.assertListEqual(list(it), [
  116. (None, x),
  117. (x, c[0]),
  118. (x, c[1]),
  119. (x, c[2]),
  120. ])
  121. x.backend._cache.pop('1')
  122. x.ready = Mock()
  123. x.ready.return_value = False
  124. with self.assertRaises(IncompleteStream):
  125. list(x.iterdeps())
  126. list(x.iterdeps(intermediate=True))
  127. def test_eq_not_implemented(self):
  128. self.assertFalse(self.app.AsyncResult('1') == object())
  129. def test_reduce(self):
  130. a1 = self.app.AsyncResult('uuid', task_name=self.mytask.name)
  131. restored = pickle.loads(pickle.dumps(a1))
  132. self.assertEqual(restored.id, 'uuid')
  133. self.assertEqual(restored.task_name, self.mytask.name)
  134. a2 = self.app.AsyncResult('uuid')
  135. self.assertEqual(pickle.loads(pickle.dumps(a2)).id, 'uuid')
  136. def test_successful(self):
  137. ok_res = self.app.AsyncResult(self.task1['id'])
  138. nok_res = self.app.AsyncResult(self.task3['id'])
  139. nok_res2 = self.app.AsyncResult(self.task4['id'])
  140. self.assertTrue(ok_res.successful())
  141. self.assertFalse(nok_res.successful())
  142. self.assertFalse(nok_res2.successful())
  143. pending_res = self.app.AsyncResult(uuid())
  144. self.assertFalse(pending_res.successful())
  145. def test_str(self):
  146. ok_res = self.app.AsyncResult(self.task1['id'])
  147. ok2_res = self.app.AsyncResult(self.task2['id'])
  148. nok_res = self.app.AsyncResult(self.task3['id'])
  149. self.assertEqual(str(ok_res), self.task1['id'])
  150. self.assertEqual(str(ok2_res), self.task2['id'])
  151. self.assertEqual(str(nok_res), self.task3['id'])
  152. pending_id = uuid()
  153. pending_res = self.app.AsyncResult(pending_id)
  154. self.assertEqual(str(pending_res), pending_id)
  155. def test_repr(self):
  156. ok_res = self.app.AsyncResult(self.task1['id'])
  157. ok2_res = self.app.AsyncResult(self.task2['id'])
  158. nok_res = self.app.AsyncResult(self.task3['id'])
  159. self.assertEqual(repr(ok_res), '<AsyncResult: %s>' % (
  160. self.task1['id']))
  161. self.assertEqual(repr(ok2_res), '<AsyncResult: %s>' % (
  162. self.task2['id']))
  163. self.assertEqual(repr(nok_res), '<AsyncResult: %s>' % (
  164. self.task3['id']))
  165. pending_id = uuid()
  166. pending_res = self.app.AsyncResult(pending_id)
  167. self.assertEqual(repr(pending_res), '<AsyncResult: %s>' % (
  168. pending_id))
  169. def test_hash(self):
  170. self.assertEqual(hash(self.app.AsyncResult('x0w991')),
  171. hash(self.app.AsyncResult('x0w991')))
  172. self.assertNotEqual(hash(self.app.AsyncResult('x0w991')),
  173. hash(self.app.AsyncResult('x1w991')))
  174. def test_get_traceback(self):
  175. ok_res = self.app.AsyncResult(self.task1['id'])
  176. nok_res = self.app.AsyncResult(self.task3['id'])
  177. nok_res2 = self.app.AsyncResult(self.task4['id'])
  178. self.assertFalse(ok_res.traceback)
  179. self.assertTrue(nok_res.traceback)
  180. self.assertTrue(nok_res2.traceback)
  181. pending_res = self.app.AsyncResult(uuid())
  182. self.assertFalse(pending_res.traceback)
  183. def test_get(self):
  184. ok_res = self.app.AsyncResult(self.task1['id'])
  185. ok2_res = self.app.AsyncResult(self.task2['id'])
  186. nok_res = self.app.AsyncResult(self.task3['id'])
  187. nok2_res = self.app.AsyncResult(self.task4['id'])
  188. self.assertEqual(ok_res.get(), 'the')
  189. self.assertEqual(ok2_res.get(), 'quick')
  190. with self.assertRaises(KeyError):
  191. nok_res.get()
  192. self.assertTrue(nok_res.get(propagate=False))
  193. self.assertIsInstance(nok2_res.result, KeyError)
  194. self.assertEqual(ok_res.info, 'the')
  195. def test_get_timeout(self):
  196. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  197. with self.assertRaises(TimeoutError):
  198. res.get(timeout=0.001)
  199. pending_res = self.app.AsyncResult(uuid())
  200. with patch('celery.result.time') as _time:
  201. with self.assertRaises(TimeoutError):
  202. pending_res.get(timeout=0.001, interval=0.001)
  203. _time.sleep.assert_called_with(0.001)
  204. def test_get_timeout_longer(self):
  205. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  206. with patch('celery.result.time') as _time:
  207. with self.assertRaises(TimeoutError):
  208. res.get(timeout=1, interval=1)
  209. _time.sleep.assert_called_with(1)
  210. def test_ready(self):
  211. oks = (self.app.AsyncResult(self.task1['id']),
  212. self.app.AsyncResult(self.task2['id']),
  213. self.app.AsyncResult(self.task3['id']))
  214. self.assertTrue(all(result.ready() for result in oks))
  215. self.assertFalse(self.app.AsyncResult(self.task4['id']).ready())
  216. self.assertFalse(self.app.AsyncResult(uuid()).ready())
  217. class test_ResultSet(AppCase):
  218. def test_resultset_repr(self):
  219. self.assertTrue(repr(ResultSet(
  220. [self.app.AsyncResult(t) for t in ['1', '2', '3']])))
  221. def test_eq_other(self):
  222. self.assertFalse(ResultSet([1, 3, 3]) == 1)
  223. self.assertTrue(ResultSet([1]) == ResultSet([1]))
  224. def test_get(self):
  225. x = ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  226. b = x.results[0].backend = Mock()
  227. b.supports_native_join = False
  228. x.join_native = Mock()
  229. x.join = Mock()
  230. x.get()
  231. self.assertTrue(x.join.called)
  232. b.supports_native_join = True
  233. x.get()
  234. self.assertTrue(x.join_native.called)
  235. def test_add(self):
  236. x = ResultSet([1])
  237. x.add(2)
  238. self.assertEqual(len(x), 2)
  239. x.add(2)
  240. self.assertEqual(len(x), 2)
  241. @contextmanager
  242. def dummy_copy(self):
  243. with patch('celery.result.copy') as copy:
  244. def passt(arg):
  245. return arg
  246. copy.side_effect = passt
  247. yield
  248. def test_iterate_respects_subpolling_interval(self):
  249. r1 = self.app.AsyncResult(uuid())
  250. r2 = self.app.AsyncResult(uuid())
  251. backend = r1.backend = r2.backend = Mock()
  252. backend.subpolling_interval = 10
  253. ready = r1.ready = r2.ready = Mock()
  254. def se(*args, **kwargs):
  255. ready.side_effect = KeyError()
  256. return False
  257. ready.return_value = False
  258. ready.side_effect = se
  259. x = ResultSet([r1, r2])
  260. with self.dummy_copy():
  261. with patch('celery.result.time') as _time:
  262. with self.assertRaises(KeyError):
  263. list(x.iterate())
  264. _time.sleep.assert_called_with(10)
  265. backend.subpolling_interval = 0
  266. with patch('celery.result.time') as _time:
  267. with self.assertRaises(KeyError):
  268. ready.return_value = False
  269. ready.side_effect = se
  270. list(x.iterate())
  271. self.assertFalse(_time.sleep.called)
  272. def test_times_out(self):
  273. r1 = self.app.AsyncResult(uuid)
  274. r1.ready = Mock()
  275. r1.ready.return_value = False
  276. x = ResultSet([r1])
  277. with self.dummy_copy():
  278. with patch('celery.result.time'):
  279. with self.assertRaises(TimeoutError):
  280. list(x.iterate(timeout=1))
  281. def test_add_discard(self):
  282. x = ResultSet([])
  283. x.add(self.app.AsyncResult('1'))
  284. self.assertIn(self.app.AsyncResult('1'), x.results)
  285. x.discard(self.app.AsyncResult('1'))
  286. x.discard(self.app.AsyncResult('1'))
  287. x.discard('1')
  288. self.assertNotIn(self.app.AsyncResult('1'), x.results)
  289. x.update([self.app.AsyncResult('2')])
  290. def test_clear(self):
  291. x = ResultSet([])
  292. r = x.results
  293. x.clear()
  294. self.assertIs(x.results, r)
  295. class MockAsyncResultFailure(AsyncResult):
  296. @property
  297. def result(self):
  298. return KeyError('baz')
  299. @property
  300. def state(self):
  301. return states.FAILURE
  302. def get(self, propagate=True, **kwargs):
  303. if propagate:
  304. raise self.result
  305. return self.result
  306. class MockAsyncResultSuccess(AsyncResult):
  307. forgotten = False
  308. def forget(self):
  309. self.forgotten = True
  310. @property
  311. def result(self):
  312. return 42
  313. @property
  314. def state(self):
  315. return states.SUCCESS
  316. def get(self, **kwargs):
  317. return self.result
  318. class SimpleBackend(object):
  319. ids = []
  320. def __init__(self, ids=[]):
  321. self.ids = ids
  322. def get_many(self, *args, **kwargs):
  323. return ((id, {'result': i, 'status': states.SUCCESS})
  324. for i, id in enumerate(self.ids))
  325. class test_TaskSetResult(AppCase):
  326. def setup(self):
  327. self.size = 10
  328. self.ts = TaskSetResult(uuid(), make_mock_group(self.app, self.size))
  329. def test_total(self):
  330. self.assertEqual(self.ts.total, self.size)
  331. def test_compat_properties(self):
  332. self.assertEqual(self.ts.taskset_id, self.ts.id)
  333. self.ts.taskset_id = 'foo'
  334. self.assertEqual(self.ts.taskset_id, 'foo')
  335. def test_compat_subtasks_kwarg(self):
  336. x = TaskSetResult(uuid(), subtasks=[1, 2, 3])
  337. self.assertEqual(x.results, [1, 2, 3])
  338. def test_itersubtasks(self):
  339. it = self.ts.itersubtasks()
  340. for i, t in enumerate(it):
  341. self.assertEqual(t.get(), i)
  342. class test_GroupResult(AppCase):
  343. def setup(self):
  344. self.size = 10
  345. self.ts = self.app.GroupResult(
  346. uuid(), make_mock_group(self.app, self.size),
  347. )
  348. def test_is_pickleable(self):
  349. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  350. self.assertEqual(pickle.loads(pickle.dumps(ts)), ts)
  351. ts2 = GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  352. self.assertEqual(pickle.loads(pickle.dumps(ts2)), ts2)
  353. def test_len(self):
  354. self.assertEqual(len(self.ts), self.size)
  355. def test_eq_other(self):
  356. self.assertFalse(self.ts == 1)
  357. def test_reduce(self):
  358. self.assertTrue(pickle.loads(pickle.dumps(self.ts)))
  359. def test_iterate_raises(self):
  360. ar = MockAsyncResultFailure(uuid(), app=self.app)
  361. ts = self.app.GroupResult(uuid(), [ar])
  362. it = ts.iterate()
  363. with self.assertRaises(KeyError):
  364. next(it)
  365. def test_forget(self):
  366. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  367. MockAsyncResultSuccess(uuid(), app=self.app)]
  368. ts = self.app.GroupResult(uuid(), subs)
  369. ts.forget()
  370. for sub in subs:
  371. self.assertTrue(sub.forgotten)
  372. def test_getitem(self):
  373. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  374. MockAsyncResultSuccess(uuid(), app=self.app)]
  375. ts = self.app.GroupResult(uuid(), subs)
  376. self.assertIs(ts[0], subs[0])
  377. def test_save_restore(self):
  378. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  379. MockAsyncResultSuccess(uuid(), app=self.app)]
  380. ts = self.app.GroupResult(uuid(), subs)
  381. ts.save()
  382. with self.assertRaises(AttributeError):
  383. ts.save(backend=object())
  384. self.assertEqual(self.app.GroupResult.restore(ts.id).subtasks,
  385. ts.subtasks)
  386. ts.delete()
  387. self.assertIsNone(self.app.GroupResult.restore(ts.id))
  388. with self.assertRaises(AttributeError):
  389. self.app.GroupResult.restore(ts.id, backend=object())
  390. def test_join_native(self):
  391. backend = SimpleBackend()
  392. subtasks = [self.app.AsyncResult(uuid(), backend=backend)
  393. for i in range(10)]
  394. ts = self.app.GroupResult(uuid(), subtasks)
  395. backend.ids = [subtask.id for subtask in subtasks]
  396. res = ts.join_native()
  397. self.assertEqual(res, list(range(10)))
  398. def test_join_native_raises(self):
  399. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  400. ts.iter_native = Mock()
  401. ts.iter_native.return_value = iter([
  402. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  403. ])
  404. with self.assertRaises(KeyError):
  405. ts.join_native(propagate=True)
  406. def test_failed_join_report(self):
  407. res = Mock()
  408. ts = self.app.GroupResult(uuid(), [res])
  409. res.state = states.FAILURE
  410. res.backend.is_cached.return_value = True
  411. self.assertIs(next(ts._failed_join_report()), res)
  412. res.backend.is_cached.return_value = False
  413. with self.assertRaises(StopIteration):
  414. next(ts._failed_join_report())
  415. def test_repr(self):
  416. self.assertTrue(repr(
  417. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  418. ))
  419. def test_children_is_results(self):
  420. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  421. self.assertIs(ts.children, ts.results)
  422. def test_iter_native(self):
  423. backend = SimpleBackend()
  424. subtasks = [self.app.AsyncResult(uuid(), backend=backend)
  425. for i in range(10)]
  426. ts = self.app.GroupResult(uuid(), subtasks)
  427. backend.ids = [subtask.id for subtask in subtasks]
  428. self.assertEqual(len(list(ts.iter_native())), 10)
  429. def test_iterate_yields(self):
  430. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  431. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  432. ts = self.app.GroupResult(uuid(), [ar, ar2])
  433. it = ts.iterate()
  434. self.assertEqual(next(it), 42)
  435. self.assertEqual(next(it), 42)
  436. def test_iterate_eager(self):
  437. ar1 = EagerResult(uuid(), 42, states.SUCCESS)
  438. ar2 = EagerResult(uuid(), 42, states.SUCCESS)
  439. ts = self.app.GroupResult(uuid(), [ar1, ar2])
  440. it = ts.iterate()
  441. self.assertEqual(next(it), 42)
  442. self.assertEqual(next(it), 42)
  443. def test_join_timeout(self):
  444. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  445. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  446. ar3 = self.app.AsyncResult(uuid())
  447. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  448. with self.assertRaises(TimeoutError):
  449. ts.join(timeout=0.0000001)
  450. ar4 = self.app.AsyncResult(uuid())
  451. ar4.get = Mock()
  452. ts2 = self.app.GroupResult(uuid(), [ar4])
  453. self.assertTrue(ts2.join(timeout=0.1))
  454. def test_iter_native_when_empty_group(self):
  455. ts = self.app.GroupResult(uuid(), [])
  456. self.assertListEqual(list(ts.iter_native()), [])
  457. def test_iterate_simple(self):
  458. it = self.ts.iterate()
  459. results = sorted(list(it))
  460. self.assertListEqual(results, list(range(self.size)))
  461. def test___iter__(self):
  462. self.assertListEqual(list(iter(self.ts)), self.ts.results)
  463. def test_join(self):
  464. joined = self.ts.join()
  465. self.assertListEqual(joined, list(range(self.size)))
  466. def test_successful(self):
  467. self.assertTrue(self.ts.successful())
  468. def test_failed(self):
  469. self.assertFalse(self.ts.failed())
  470. def test_waiting(self):
  471. self.assertFalse(self.ts.waiting())
  472. def test_ready(self):
  473. self.assertTrue(self.ts.ready())
  474. def test_completed_count(self):
  475. self.assertEqual(self.ts.completed_count(), len(self.ts))
  476. class test_pending_AsyncResult(AppCase):
  477. def setup(self):
  478. self.task = self.app.AsyncResult(uuid())
  479. def test_result(self):
  480. self.assertIsNone(self.task.result)
  481. class test_failed_AsyncResult(test_GroupResult):
  482. def setup(self):
  483. self.size = 11
  484. subtasks = make_mock_group(self.app, 10)
  485. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  486. save_result(self.app, failed)
  487. failed_res = self.app.AsyncResult(failed['id'])
  488. self.ts = self.app.GroupResult(uuid(), subtasks + [failed_res])
  489. def test_completed_count(self):
  490. self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
  491. def test_iterate_simple(self):
  492. it = self.ts.iterate()
  493. def consume():
  494. return list(it)
  495. with self.assertRaises(KeyError):
  496. consume()
  497. def test_join(self):
  498. with self.assertRaises(KeyError):
  499. self.ts.join()
  500. def test_successful(self):
  501. self.assertFalse(self.ts.successful())
  502. def test_failed(self):
  503. self.assertTrue(self.ts.failed())
  504. class test_pending_Group(AppCase):
  505. def setup(self):
  506. self.ts = self.app.GroupResult(
  507. uuid(), [self.app.AsyncResult(uuid()),
  508. self.app.AsyncResult(uuid())])
  509. def test_completed_count(self):
  510. self.assertEqual(self.ts.completed_count(), 0)
  511. def test_ready(self):
  512. self.assertFalse(self.ts.ready())
  513. def test_waiting(self):
  514. self.assertTrue(self.ts.waiting())
  515. def x_join(self):
  516. with self.assertRaises(TimeoutError):
  517. self.ts.join(timeout=0.001)
  518. @skip_if_quick
  519. def x_join_longer(self):
  520. with self.assertRaises(TimeoutError):
  521. self.ts.join(timeout=1)
  522. class test_EagerResult(AppCase):
  523. def setup(self):
  524. @self.app.task
  525. def raising(x, y):
  526. raise KeyError(x, y)
  527. self.raising = raising
  528. def test_wait_raises(self):
  529. res = self.raising.apply(args=[3, 3])
  530. with self.assertRaises(KeyError):
  531. res.wait()
  532. self.assertTrue(res.wait(propagate=False))
  533. def test_wait(self):
  534. res = EagerResult('x', 'x', states.RETRY)
  535. res.wait()
  536. self.assertEqual(res.state, states.RETRY)
  537. self.assertEqual(res.status, states.RETRY)
  538. def test_forget(self):
  539. res = EagerResult('x', 'x', states.RETRY)
  540. res.forget()
  541. def test_revoke(self):
  542. res = self.raising.apply(args=[3, 3])
  543. self.assertFalse(res.revoke())
  544. class test_serializable(AppCase):
  545. def test_AsyncResult(self):
  546. x = self.app.AsyncResult(uuid())
  547. self.assertEqual(x, from_serializable(x.serializable(), self.app))
  548. self.assertEqual(x, from_serializable(x, self.app))
  549. def test_with_parent(self):
  550. x = self.app.AsyncResult(uuid())
  551. x.parent = self.app.AsyncResult(uuid())
  552. y = from_serializable(x.serializable(), self.app)
  553. self.assertEqual(y, x)
  554. self.assertEqual(y.parent, x.parent)
  555. self.assertIsInstance(y.parent, AsyncResult)
  556. def test_compat(self):
  557. uid = uuid()
  558. x = from_serializable([uid, []])
  559. self.assertEqual(x.id, uid)
  560. def test_GroupResult(self):
  561. x = self.app.GroupResult(
  562. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  563. )
  564. self.assertEqual(x, from_serializable(x.serializable(), self.app))
  565. self.assertEqual(x, from_serializable(x, self.app))