| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015 | from __future__ import absolute_import, unicode_literalsimport copyimport tracebackfrom contextlib import contextmanagerimport pytestfrom case import Mock, call, patch, skipfrom celery import states, uuidfrom celery.backends.base import SyncBackendMixinfrom celery.exceptions import (CPendingDeprecationWarning,                               ImproperlyConfigured, IncompleteStream,                               TimeoutError)from celery.five import rangefrom celery.result import (AsyncResult, EagerResult, GroupResult, ResultSet,                           assert_will_not_block, result_from_tuple)from celery.utils.serialization import picklePYTRACEBACK = """\Traceback (most recent call last):  File "foo.py", line 2, in foofunc    don't matter  File "bar.py", line 3, in barfunc    don't matterDoesn't matter: really!\"""def mock_task(name, state, result, traceback=None):    return {        'id': uuid(), 'name': name, 'state': state,        'result': result, 'traceback': traceback,    }def save_result(app, task):    traceback = task.get('traceback') or 'Some traceback'    if task['state'] == states.SUCCESS:        app.backend.mark_as_done(task['id'], task['result'])    elif task['state'] == states.RETRY:        app.backend.mark_as_retry(            task['id'], task['result'], traceback=traceback,        )    else:        app.backend.mark_as_failure(            task['id'], task['result'], traceback=traceback,        )def make_mock_group(app, size=10):    tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]    [save_result(app, task) for task in tasks]    return [app.AsyncResult(task['id']) for task in tasks]class _MockBackend:    def add_pending_result(self, *args, **kwargs):        return True    def wait_for_pending(self, *args, **kwargs):        return Trueclass test_AsyncResult:    def setup(self):        self.app.conf.result_cache_max = 100        self.app.conf.result_serializer = 'pickle'        self.task1 = mock_task('task1', states.SUCCESS, 'the')        self.task2 = mock_task('task2', states.SUCCESS, 'quick')        self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))        self.task4 = mock_task('task3', states.RETRY, KeyError('red'))        self.task5 = mock_task(            'task3', states.FAILURE, KeyError('blue'), PYTRACEBACK,        )        for task in (self.task1, self.task2,                     self.task3, self.task4, self.task5):            save_result(self.app, task)        @self.app.task(shared=False)        def mytask():            pass        self.mytask = mytask    def test_ignored_getter(self):        result = self.app.AsyncResult(uuid())        assert result.ignored is False        result.__delattr__('_ignored')        assert result.ignored is False    @patch('celery.result.task_join_will_block')    def test_assert_will_not_block(self, task_join_will_block):        task_join_will_block.return_value = True        with pytest.raises(RuntimeError):            assert_will_not_block()        task_join_will_block.return_value = False        assert_will_not_block()    @patch('celery.result.task_join_will_block')    def test_get_sync_subtask_option(self, task_join_will_block):        task_join_will_block.return_value = True        tid = uuid()        backend = _MockBackend()        res_subtask_async = AsyncResult(tid, backend=backend)        with pytest.raises(RuntimeError):            res_subtask_async.get()        res_subtask_async.get(disable_sync_subtasks=False)    def test_without_id(self):        with pytest.raises(ValueError):            AsyncResult(None, app=self.app)    def test_compat_properties(self):        x = self.app.AsyncResult('1')        assert x.task_id == x.id        x.task_id = '2'        assert x.id == '2'    @pytest.mark.usefixtures('depends_on_current_app')    def test_reduce_direct(self):        x = AsyncResult('1', app=self.app)        fun, args = x.__reduce__()        assert fun(*args) == x    def test_children(self):        x = self.app.AsyncResult('1')        children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]        x._cache = {'children': children, 'status': states.SUCCESS}        x.backend = Mock()        assert x.children        assert len(x.children) == 3    def test_propagates_for_parent(self):        x = self.app.AsyncResult(uuid())        x.backend = Mock(name='backend')        x.backend.get_task_meta.return_value = {}        x.backend.wait_for_pending.return_value = 84        x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)        with pytest.raises(KeyError):            x.get(propagate=True)        x.backend.wait_for_pending.assert_not_called()        x.parent = EagerResult(uuid(), 42, states.SUCCESS)        assert x.get(propagate=True) == 84        x.backend.wait_for_pending.assert_called()    def test_get_children(self):        tid = uuid()        x = self.app.AsyncResult(tid)        child = [self.app.AsyncResult(uuid()).as_tuple()                 for i in range(10)]        x._cache = {'children': child}        assert x.children        assert len(x.children) == 10        x._cache = {'status': states.SUCCESS}        x.backend._cache[tid] = {'result': None}        assert x.children is None    def test_build_graph_get_leaf_collect(self):        x = self.app.AsyncResult('1')        x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}        c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]        x.iterdeps = Mock()        x.iterdeps.return_value = (            (None, x),            (x, c[0]),            (c[0], c[1]),            (c[1], c[2])        )        x.backend.READY_STATES = states.READY_STATES        assert x.graph        assert x.get_leaf() is 2        it = x.collect()        assert list(it) == [            (x, None),            (c[0], 0),            (c[1], 1),            (c[2], 2),        ]    def test_iterdeps(self):        x = self.app.AsyncResult('1')        c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]        x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}        for child in c:            child.backend = Mock()            child.backend.get_children.return_value = []        it = x.iterdeps()        assert list(it) == [            (None, x),            (x, c[0]),            (x, c[1]),            (x, c[2]),        ]        x._cache = None        x.ready = Mock()        x.ready.return_value = False        with pytest.raises(IncompleteStream):            list(x.iterdeps())        list(x.iterdeps(intermediate=True))    def test_eq_not_implemented(self):        assert self.app.AsyncResult('1') != object()    @pytest.mark.usefixtures('depends_on_current_app')    def test_reduce(self):        a1 = self.app.AsyncResult('uuid')        restored = pickle.loads(pickle.dumps(a1))        assert restored.id == 'uuid'        a2 = self.app.AsyncResult('uuid')        assert pickle.loads(pickle.dumps(a2)).id == 'uuid'    def test_maybe_set_cache_empty(self):        self.app.AsyncResult('uuid')._maybe_set_cache(None)    def test_set_cache__children(self):        r1 = self.app.AsyncResult('id1')        r2 = self.app.AsyncResult('id2')        r1._set_cache({'children': [r2.as_tuple()]})        assert r2 in r1.children    def test_successful(self):        ok_res = self.app.AsyncResult(self.task1['id'])        nok_res = self.app.AsyncResult(self.task3['id'])        nok_res2 = self.app.AsyncResult(self.task4['id'])        assert ok_res.successful()        assert not nok_res.successful()        assert not nok_res2.successful()        pending_res = self.app.AsyncResult(uuid())        assert not pending_res.successful()    def test_raising(self):        notb = self.app.AsyncResult(self.task3['id'])        withtb = self.app.AsyncResult(self.task5['id'])        with pytest.raises(KeyError):            notb.get()        try:            withtb.get()        except KeyError:            tb = traceback.format_exc()            assert '  File "foo.py", line 2, in foofunc' not in tb            assert '  File "bar.py", line 3, in barfunc' not in tb            assert 'KeyError:' in tb            assert "'blue'" in tb        else:            raise AssertionError('Did not raise KeyError.')    @skip.unless_module('tblib')    def test_raising_remote_tracebacks(self):        withtb = self.app.AsyncResult(self.task5['id'])        self.app.conf.task_remote_tracebacks = True        try:            withtb.get()        except KeyError:            tb = traceback.format_exc()            assert '  File "foo.py", line 2, in foofunc' in tb            assert '  File "bar.py", line 3, in barfunc' in tb            assert 'KeyError:' in tb            assert "'blue'" in tb        else:            raise AssertionError('Did not raise KeyError.')    def test_str(self):        ok_res = self.app.AsyncResult(self.task1['id'])        ok2_res = self.app.AsyncResult(self.task2['id'])        nok_res = self.app.AsyncResult(self.task3['id'])        assert str(ok_res) == self.task1['id']        assert str(ok2_res) == self.task2['id']        assert str(nok_res) == self.task3['id']        pending_id = uuid()        pending_res = self.app.AsyncResult(pending_id)        assert str(pending_res) == pending_id    def test_repr(self):        ok_res = self.app.AsyncResult(self.task1['id'])        ok2_res = self.app.AsyncResult(self.task2['id'])        nok_res = self.app.AsyncResult(self.task3['id'])        assert repr(ok_res) == '<AsyncResult: %s>' % (self.task1['id'],)        assert repr(ok2_res) == '<AsyncResult: %s>' % (self.task2['id'],)        assert repr(nok_res) == '<AsyncResult: %s>' % (self.task3['id'],)        pending_id = uuid()        pending_res = self.app.AsyncResult(pending_id)        assert repr(pending_res) == '<AsyncResult: %s>' % (pending_id,)    def test_hash(self):        assert (hash(self.app.AsyncResult('x0w991')) ==                hash(self.app.AsyncResult('x0w991')))        assert (hash(self.app.AsyncResult('x0w991')) !=                hash(self.app.AsyncResult('x1w991')))    def test_get_traceback(self):        ok_res = self.app.AsyncResult(self.task1['id'])        nok_res = self.app.AsyncResult(self.task3['id'])        nok_res2 = self.app.AsyncResult(self.task4['id'])        assert not ok_res.traceback        assert nok_res.traceback        assert nok_res2.traceback        pending_res = self.app.AsyncResult(uuid())        assert not pending_res.traceback    def test_get__backend_gives_None(self):        res = self.app.AsyncResult(self.task1['id'])        res.backend.wait_for = Mock(name='wait_for')        res.backend.wait_for.return_value = None        assert res.get() is None    def test_get(self):        ok_res = self.app.AsyncResult(self.task1['id'])        ok2_res = self.app.AsyncResult(self.task2['id'])        nok_res = self.app.AsyncResult(self.task3['id'])        nok2_res = self.app.AsyncResult(self.task4['id'])        callback = Mock(name='callback')        assert ok_res.get(callback=callback) == 'the'        callback.assert_called_with(ok_res.id, 'the')        assert ok2_res.get() == 'quick'        with pytest.raises(KeyError):            nok_res.get()        assert nok_res.get(propagate=False)        assert isinstance(nok2_res.result, KeyError)        assert ok_res.info == 'the'    def test_get_when_ignored(self):        result = self.app.AsyncResult(uuid())        result.ignored = True        # Does not block        assert result.get() is None    def test_eq_ne(self):        r1 = self.app.AsyncResult(self.task1['id'])        r2 = self.app.AsyncResult(self.task1['id'])        r3 = self.app.AsyncResult(self.task2['id'])        assert r1 == r2        assert r1 != r3        assert r1 == r2.id        assert r1 != r3.id    @pytest.mark.usefixtures('depends_on_current_app')    def test_reduce_restore(self):        r1 = self.app.AsyncResult(self.task1['id'])        fun, args = r1.__reduce__()        assert fun(*args) == r1    def test_get_timeout(self):        res = self.app.AsyncResult(self.task4['id'])  # has RETRY state        with pytest.raises(TimeoutError):            res.get(timeout=0.001)        pending_res = self.app.AsyncResult(uuid())        with patch('celery.result.time') as _time:            with pytest.raises(TimeoutError):                pending_res.get(timeout=0.001, interval=0.001)                _time.sleep.assert_called_with(0.001)    def test_get_timeout_longer(self):        res = self.app.AsyncResult(self.task4['id'])  # has RETRY state        with patch('celery.result.time') as _time:            with pytest.raises(TimeoutError):                res.get(timeout=1, interval=1)                _time.sleep.assert_called_with(1)    def test_ready(self):        oks = (self.app.AsyncResult(self.task1['id']),               self.app.AsyncResult(self.task2['id']),               self.app.AsyncResult(self.task3['id']))        assert all(result.ready() for result in oks)        assert not self.app.AsyncResult(self.task4['id']).ready()        assert not self.app.AsyncResult(uuid()).ready()    def test_del(self):        with patch('celery.result.AsyncResult.backend') as backend:            result = self.app.AsyncResult(self.task1['id'])            result_clone = copy.copy(result)            del result            assert backend.remove_pending_result.called_once_with(                result_clone            )        result = self.app.AsyncResult(self.task1['id'])        result.backend = None        del resultclass test_ResultSet:    def test_resultset_repr(self):        assert repr(self.app.ResultSet(            [self.app.AsyncResult(t) for t in ['1', '2', '3']]))    def test_eq_other(self):        assert self.app.ResultSet([            self.app.AsyncResult(t) for t in [1, 3, 3]]) != 1        rs1 = self.app.ResultSet([self.app.AsyncResult(1)])        rs2 = self.app.ResultSet([self.app.AsyncResult(1)])        assert rs1 == rs2    def test_get(self):        x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])        b = x.results[0].backend = Mock()        b.supports_native_join = False        x.join_native = Mock()        x.join = Mock()        x.get()        x.join.assert_called()        b.supports_native_join = True        x.get()        x.join_native.assert_called()    def test_eq_ne(self):        g1 = self.app.ResultSet([            self.app.AsyncResult('id1'),            self.app.AsyncResult('id2'),        ])        g2 = self.app.ResultSet([            self.app.AsyncResult('id1'),            self.app.AsyncResult('id2'),        ])        g3 = self.app.ResultSet([            self.app.AsyncResult('id3'),            self.app.AsyncResult('id1'),        ])        assert g1 == g2        assert g1 != g3        assert g1 != object()    def test_takes_app_from_first_task(self):        x = ResultSet([self.app.AsyncResult('id1')])        assert x.app is x.results[0].app        x.app = self.app        assert x.app is self.app    def test_get_empty(self):        x = self.app.ResultSet([])        assert x.supports_native_join is None        x.join = Mock(name='join')        x.get()        x.join.assert_called()    def test_add(self):        x = self.app.ResultSet([self.app.AsyncResult(1)])        x.add(self.app.AsyncResult(2))        assert len(x) == 2        x.add(self.app.AsyncResult(2))        assert len(x) == 2    @contextmanager    def dummy_copy(self):        with patch('celery.result.copy') as copy:            def passt(arg):                return arg            copy.side_effect = passt            yield    def test_iterate_respects_subpolling_interval(self):        r1 = self.app.AsyncResult(uuid())        r2 = self.app.AsyncResult(uuid())        backend = r1.backend = r2.backend = Mock()        backend.subpolling_interval = 10        ready = r1.ready = r2.ready = Mock()        def se(*args, **kwargs):            ready.side_effect = KeyError()            return False        ready.return_value = False        ready.side_effect = se        x = self.app.ResultSet([r1, r2])        with self.dummy_copy():            with patch('celery.result.time') as _time:                with pytest.warns(CPendingDeprecationWarning):                    with pytest.raises(KeyError):                        list(x.iterate())                _time.sleep.assert_called_with(10)            backend.subpolling_interval = 0            with patch('celery.result.time') as _time:                with pytest.warns(CPendingDeprecationWarning):                    with pytest.raises(KeyError):                        ready.return_value = False                        ready.side_effect = se                        list(x.iterate())                    _time.sleep.assert_not_called()    def test_times_out(self):        r1 = self.app.AsyncResult(uuid)        r1.ready = Mock()        r1.ready.return_value = False        x = self.app.ResultSet([r1])        with self.dummy_copy():            with patch('celery.result.time'):                with pytest.warns(CPendingDeprecationWarning):                    with pytest.raises(TimeoutError):                        list(x.iterate(timeout=1))    def test_add_discard(self):        x = self.app.ResultSet([])        x.add(self.app.AsyncResult('1'))        assert self.app.AsyncResult('1') in x.results        x.discard(self.app.AsyncResult('1'))        x.discard(self.app.AsyncResult('1'))        x.discard('1')        assert self.app.AsyncResult('1') not in x.results        x.update([self.app.AsyncResult('2')])    def test_clear(self):        x = self.app.ResultSet([])        r = x.results        x.clear()        assert x.results is rclass MockAsyncResultFailure(AsyncResult):    @property    def result(self):        return KeyError('baz')    @property    def state(self):        return states.FAILURE    def get(self, propagate=True, **kwargs):        if propagate:            raise self.result        return self.resultclass MockAsyncResultSuccess(AsyncResult):    forgotten = False    def __init__(self, *args, **kwargs):        self._result = kwargs.pop('result', 42)        super(MockAsyncResultSuccess, self).__init__(*args, **kwargs)    def forget(self):        self.forgotten = True    @property    def result(self):        return self._result    @property    def state(self):        return states.SUCCESS    def get(self, **kwargs):        return self.resultclass SimpleBackend(SyncBackendMixin):    ids = []    def __init__(self, ids=[]):        self.ids = ids    def _ensure_not_eager(self):        pass    def get_many(self, *args, **kwargs):        return ((id, {'result': i, 'status': states.SUCCESS})                for i, id in enumerate(self.ids))class test_GroupResult:    def setup(self):        self.size = 10        self.ts = self.app.GroupResult(            uuid(), make_mock_group(self.app, self.size),        )    @pytest.mark.usefixtures('depends_on_current_app')    def test_is_pickleable(self):        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        assert pickle.loads(pickle.dumps(ts)) == ts        ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        assert pickle.loads(pickle.dumps(ts2)) == ts2    @pytest.mark.usefixtures('depends_on_current_app')    def test_reduce(self):        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        fun, args = ts.__reduce__()        ts2 = fun(*args)        assert ts2.id == ts.id        assert ts == ts2    def test_eq_ne(self):        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        ts2 = self.app.GroupResult(ts.id, ts.results)        ts3 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        ts4 = self.app.GroupResult(ts.id, [self.app.AsyncResult(uuid())])        assert ts == ts2        assert ts != ts3        assert ts != ts4        assert ts != object()    def test_len(self):        assert len(self.ts) == self.size    def test_eq_other(self):        assert self.ts != 1    def test_eq_with_parent(self):        # GroupResult instances with different .parent are not equal        grp_res = self.app.GroupResult(            uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],            parent=self.app.AsyncResult(uuid())        )        grp_res_2 = self.app.GroupResult(grp_res.id, grp_res.results)        assert grp_res != grp_res_2        grp_res_2.parent = self.app.AsyncResult(uuid())        assert grp_res != grp_res_2        grp_res_2.parent = grp_res.parent        assert grp_res == grp_res_2    @pytest.mark.usefixtures('depends_on_current_app')    def test_pickleable(self):        assert pickle.loads(pickle.dumps(self.ts))    def test_iterate_raises(self):        ar = MockAsyncResultFailure(uuid(), app=self.app)        ts = self.app.GroupResult(uuid(), [ar])        with pytest.warns(CPendingDeprecationWarning):            it = ts.iterate()        with pytest.raises(KeyError):            next(it)    def test_forget(self):        subs = [MockAsyncResultSuccess(uuid(), app=self.app),                MockAsyncResultSuccess(uuid(), app=self.app)]        ts = self.app.GroupResult(uuid(), subs)        ts.forget()        for sub in subs:            assert sub.forgotten    def test_get_nested_without_native_join(self):        backend = SimpleBackend()        backend.supports_native_join = False        ts = self.app.GroupResult(uuid(), [            MockAsyncResultSuccess(uuid(), result='1.1',                                   app=self.app, backend=backend),            self.app.GroupResult(uuid(), [                MockAsyncResultSuccess(uuid(), result='2.1',                                       app=self.app, backend=backend),                self.app.GroupResult(uuid(), [                    MockAsyncResultSuccess(uuid(), result='3.1',                                           app=self.app, backend=backend),                    MockAsyncResultSuccess(uuid(), result='3.2',                                           app=self.app, backend=backend),                ]),            ]),        ])        ts.app.backend = backend        vals = ts.get()        assert vals == [            '1.1',            [                '2.1',                [                    '3.1',                    '3.2',                ]            ],        ]    def test_getitem(self):        subs = [MockAsyncResultSuccess(uuid(), app=self.app),                MockAsyncResultSuccess(uuid(), app=self.app)]        ts = self.app.GroupResult(uuid(), subs)        assert ts[0] is subs[0]    def test_save_restore(self):        subs = [MockAsyncResultSuccess(uuid(), app=self.app),                MockAsyncResultSuccess(uuid(), app=self.app)]        ts = self.app.GroupResult(uuid(), subs)        ts.save()        with pytest.raises(AttributeError):            ts.save(backend=object())        assert self.app.GroupResult.restore(ts.id).results == ts.results        ts.delete()        assert self.app.GroupResult.restore(ts.id) is None        with pytest.raises(AttributeError):            self.app.GroupResult.restore(ts.id, backend=object())    def test_save_restore_empty(self):        subs = []        ts = self.app.GroupResult(uuid(), subs)        ts.save()        assert isinstance(            self.app.GroupResult.restore(ts.id),            self.app.GroupResult,        )        assert self.app.GroupResult.restore(ts.id).results == ts.results == []    def test_restore_app(self):        subs = [MockAsyncResultSuccess(uuid(), app=self.app)]        ts = self.app.GroupResult(uuid(), subs)        ts.save()        restored = GroupResult.restore(ts.id, app=self.app)        assert restored.id == ts.id    def test_restore_current_app_fallback(self):        subs = [MockAsyncResultSuccess(uuid(), app=self.app)]        ts = self.app.GroupResult(uuid(), subs)        ts.save()        with pytest.raises(RuntimeError,                           message="Test depends on current_app"):            GroupResult.restore(ts.id)    def test_join_native(self):        backend = SimpleBackend()        results = [self.app.AsyncResult(uuid(), backend=backend)                   for i in range(10)]        ts = self.app.GroupResult(uuid(), results)        ts.app.backend = backend        backend.ids = [result.id for result in results]        res = ts.join_native()        assert res == list(range(10))        callback = Mock(name='callback')        assert not ts.join_native(callback=callback)        callback.assert_has_calls([            call(r.id, i) for i, r in enumerate(ts.results)        ])    def test_join_native_raises(self):        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        ts.iter_native = Mock()        ts.iter_native.return_value = iter([            (uuid(), {'status': states.FAILURE, 'result': KeyError()})        ])        with pytest.raises(KeyError):            ts.join_native(propagate=True)    def test_failed_join_report(self):        res = Mock()        ts = self.app.GroupResult(uuid(), [res])        res.state = states.FAILURE        res.backend.is_cached.return_value = True        assert next(ts._failed_join_report()) is res        res.backend.is_cached.return_value = False        with pytest.raises(StopIteration):            next(ts._failed_join_report())    def test_repr(self):        assert repr(            self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())]))    def test_children_is_results(self):        ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])        assert ts.children is ts.results    def test_iter_native(self):        backend = SimpleBackend()        results = [self.app.AsyncResult(uuid(), backend=backend)                   for i in range(10)]        ts = self.app.GroupResult(uuid(), results)        ts.app.backend = backend        backend.ids = [result.id for result in results]        assert len(list(ts.iter_native())) == 10    def test_iterate_yields(self):        ar = MockAsyncResultSuccess(uuid(), app=self.app)        ar2 = MockAsyncResultSuccess(uuid(), app=self.app)        ts = self.app.GroupResult(uuid(), [ar, ar2])        with pytest.warns(CPendingDeprecationWarning):            it = ts.iterate()        assert next(it) == 42        assert next(it) == 42    def test_iterate_eager(self):        ar1 = EagerResult(uuid(), 42, states.SUCCESS)        ar2 = EagerResult(uuid(), 42, states.SUCCESS)        ts = self.app.GroupResult(uuid(), [ar1, ar2])        with pytest.warns(CPendingDeprecationWarning):            it = ts.iterate()        assert next(it) == 42        assert next(it) == 42    def test_join_timeout(self):        ar = MockAsyncResultSuccess(uuid(), app=self.app)        ar2 = MockAsyncResultSuccess(uuid(), app=self.app)        ar3 = self.app.AsyncResult(uuid())        ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])        with pytest.raises(TimeoutError):            ts.join(timeout=0.0000001)        ar4 = self.app.AsyncResult(uuid())        ar4.get = Mock()        ts2 = self.app.GroupResult(uuid(), [ar4])        assert ts2.join(timeout=0.1)        callback = Mock(name='callback')        assert not ts2.join(timeout=0.1, callback=callback)        callback.assert_called_with(ar4.id, ar4.get())    def test_iter_native_when_empty_group(self):        ts = self.app.GroupResult(uuid(), [])        assert list(ts.iter_native()) == []    def test_iterate_simple(self):        with pytest.warns(CPendingDeprecationWarning):            it = self.ts.iterate()        results = sorted(list(it))        assert results == list(range(self.size))    def test___iter__(self):        assert list(iter(self.ts)) == self.ts.results    def test_join(self):        joined = self.ts.join()        assert joined == list(range(self.size))    def test_successful(self):        assert self.ts.successful()    def test_failed(self):        assert not self.ts.failed()    def test_maybe_throw(self):        self.ts.results = [Mock(name='r1')]        self.ts.maybe_throw()        self.ts.results[0].maybe_throw.assert_called_with(            callback=None, propagate=True,        )    def test_join__on_message(self):        with pytest.raises(ImproperlyConfigured):            self.ts.join(on_message=Mock())    def test_waiting(self):        assert not self.ts.waiting()    def test_ready(self):        assert self.ts.ready()    def test_completed_count(self):        assert self.ts.completed_count() == len(self.ts)class test_pending_AsyncResult:    def test_result(self, app):        res = app.AsyncResult(uuid())        assert res.result is Noneclass test_failed_AsyncResult:    def setup(self):        self.size = 11        self.app.conf.result_serializer = 'pickle'        results = make_mock_group(self.app, 10)        failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))        save_result(self.app, failed)        failed_res = self.app.AsyncResult(failed['id'])        self.ts = self.app.GroupResult(uuid(), results + [failed_res])    def test_completed_count(self):        assert self.ts.completed_count() == len(self.ts) - 1    def test_iterate_simple(self):        with pytest.warns(CPendingDeprecationWarning):            it = self.ts.iterate()        def consume():            return list(it)        with pytest.raises(KeyError):            consume()    def test_join(self):        with pytest.raises(KeyError):            self.ts.join()    def test_successful(self):        assert not self.ts.successful()    def test_failed(self):        assert self.ts.failed()class test_pending_Group:    def setup(self):        self.ts = self.app.GroupResult(            uuid(), [self.app.AsyncResult(uuid()),                     self.app.AsyncResult(uuid())])    def test_completed_count(self):        assert self.ts.completed_count() == 0    def test_ready(self):        assert not self.ts.ready()    def test_waiting(self):        assert self.ts.waiting()    def test_join(self):        with pytest.raises(TimeoutError):            self.ts.join(timeout=0.001)    def test_join_longer(self):        with pytest.raises(TimeoutError):            self.ts.join(timeout=1)class test_EagerResult:    def setup(self):        @self.app.task(shared=False)        def raising(x, y):            raise KeyError(x, y)        self.raising = raising    def test_wait_raises(self):        res = self.raising.apply(args=[3, 3])        with pytest.raises(KeyError):            res.wait()        assert res.wait(propagate=False)    def test_wait(self):        res = EagerResult('x', 'x', states.RETRY)        res.wait()        assert res.state == states.RETRY        assert res.status == states.RETRY    def test_forget(self):        res = EagerResult('x', 'x', states.RETRY)        res.forget()    def test_revoke(self):        res = self.raising.apply(args=[3, 3])        assert not res.revoke()    @patch('celery.result.task_join_will_block')    def test_get_sync_subtask_option(self, task_join_will_block):        task_join_will_block.return_value = True        tid = uuid()        res_subtask_async = EagerResult(tid, 'x', 'x', states.SUCCESS)        with pytest.raises(RuntimeError):            res_subtask_async.get()        res_subtask_async.get(disable_sync_subtasks=False)class test_tuples:    def test_AsyncResult(self):        x = self.app.AsyncResult(uuid())        assert x, result_from_tuple(x.as_tuple() == self.app)        assert x, result_from_tuple(x == self.app)    def test_with_parent(self):        x = self.app.AsyncResult(uuid())        x.parent = self.app.AsyncResult(uuid())        y = result_from_tuple(x.as_tuple(), self.app)        assert y == x        assert y.parent == x.parent        assert isinstance(y.parent, AsyncResult)    def test_compat(self):        uid = uuid()        x = result_from_tuple([uid, []], app=self.app)        assert x.id == uid    def test_GroupResult(self):        x = self.app.GroupResult(            uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],        )        assert x, result_from_tuple(x.as_tuple() == self.app)        assert x, result_from_tuple(x == self.app)    def test_GroupResult_with_parent(self):        parent = self.app.AsyncResult(uuid())        result = self.app.GroupResult(            uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],            parent        )        second_result = result_from_tuple(result.as_tuple(), self.app)        assert second_result == result        assert second_result.parent == parent    def test_GroupResult_as_tuple(self):        parent = self.app.AsyncResult(uuid())        result = self.app.GroupResult(            'group-result-1',            [self.app.AsyncResult('async-result-{}'.format(i))             for i in range(2)],            parent        )        (result_id, parent_tuple), group_results = result.as_tuple()        assert result_id == result.id        assert parent_tuple == parent.as_tuple()        assert parent_tuple[0][0] == parent.id        assert isinstance(group_results, list)        expected_grp_res = [(('async-result-{}'.format(i), None), None)                            for i in range(2)]        assert group_results == expected_grp_res
 |