| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 | from __future__ import absolute_import, unicode_literalsimport sysimport typesfrom contextlib import contextmanagerimport pytestfrom case import ANY, Mock, call, patch, skipfrom celery import chord, group, states, uuidfrom celery.backends.base import (BaseBackend, DisabledBackend,                                  KeyValueStoreBackend, _nulldict)from celery.exceptions import ChordError, TimeoutErrorfrom celery.five import bytes_if_py2, items, rangefrom celery.result import result_from_tuplefrom celery.utils import serializationfrom celery.utils.functional import pass1from celery.utils.serialization import UnpickleableExceptionWrapperfrom celery.utils.serialization import find_pickleable_exception as fnpefrom celery.utils.serialization import get_pickleable_exception as gpefrom celery.utils.serialization import subclass_exceptionclass wrapobject(object):    def __init__(self, *args, **kwargs):        self.args = argsif sys.version_info[0] == 3 or getattr(sys, 'pypy_version_info', None):    Oldstyle = Noneelse:    Oldstyle = types.ClassType(bytes_if_py2('Oldstyle'), (), {})Unpickleable = subclass_exception(    bytes_if_py2('Unpickleable'), KeyError, 'foo.module',)Impossible = subclass_exception(    bytes_if_py2('Impossible'), object, 'foo.module',)Lookalike = subclass_exception(    bytes_if_py2('Lookalike'), wrapobject, 'foo.module',)class test_nulldict:    def test_nulldict(self):        x = _nulldict()        x['foo'] = 1        x.update(foo=1, bar=2)        x.setdefault('foo', 3)class test_serialization:    def test_create_exception_cls(self):        assert serialization.create_exception_cls('FooError', 'm')        assert serialization.create_exception_cls('FooError', 'm', KeyError)class test_BaseBackend_interface:    def setup(self):        self.b = BaseBackend(self.app)    def test__forget(self):        with pytest.raises(NotImplementedError):            self.b._forget('SOMExx-N0Nex1stant-IDxx-')    def test_forget(self):        with pytest.raises(NotImplementedError):            self.b.forget('SOMExx-N0nex1stant-IDxx-')    def test_on_chord_part_return(self):        self.b.on_chord_part_return(None, None, None)    def test_apply_chord(self, unlock='celery.chord_unlock'):        self.app.tasks[unlock] = Mock()        self.b.apply_chord(            group(app=self.app), (), 'dakj221', None,            result=[self.app.AsyncResult(x) for x in [1, 2, 3]],        )        assert self.app.tasks[unlock].apply_async.call_countclass test_exception_pickle:    @skip.if_python3(reason='does not support old style classes')    @skip.if_pypy()    def test_oldstyle(self):        assert fnpe(Oldstyle())    def test_BaseException(self):        assert fnpe(Exception()) is None    def test_get_pickleable_exception(self):        exc = Exception('foo')        assert gpe(exc) == exc    def test_unpickleable(self):        assert isinstance(fnpe(Unpickleable()), KeyError)        assert fnpe(Impossible()) is Noneclass test_prepare_exception:    def setup(self):        self.b = BaseBackend(self.app)    def test_unpickleable(self):        self.b.serializer = 'pickle'        x = self.b.prepare_exception(Unpickleable(1, 2, 'foo'))        assert isinstance(x, KeyError)        y = self.b.exception_to_python(x)        assert isinstance(y, KeyError)    def test_impossible(self):        self.b.serializer = 'pickle'        x = self.b.prepare_exception(Impossible())        assert isinstance(x, UnpickleableExceptionWrapper)        assert str(x)        y = self.b.exception_to_python(x)        assert y.__class__.__name__ == 'Impossible'        if sys.version_info < (2, 5):            assert y.__class__.__module__        else:            assert y.__class__.__module__ == 'foo.module'    def test_regular(self):        self.b.serializer = 'pickle'        x = self.b.prepare_exception(KeyError('baz'))        assert isinstance(x, KeyError)        y = self.b.exception_to_python(x)        assert isinstance(y, KeyError)    def test_unicode_message(self):        message = u'\u03ac'        x = self.b.prepare_exception(Exception(message))        assert x == {'exc_message': (message,),                     'exc_type': Exception.__name__,                     'exc_module': Exception.__module__}class KVBackend(KeyValueStoreBackend):    mget_returns_dict = False    def __init__(self, app, *args, **kwargs):        self.db = {}        super(KVBackend, self).__init__(app)    def get(self, key):        return self.db.get(key)    def set(self, key, value):        self.db[key] = value    def mget(self, keys):        if self.mget_returns_dict:            return {key: self.get(key) for key in keys}        else:            return [self.get(k) for k in keys]    def delete(self, key):        self.db.pop(key, None)class DictBackend(BaseBackend):    def __init__(self, *args, **kwargs):        BaseBackend.__init__(self, *args, **kwargs)        self._data = {'can-delete': {'result': 'foo'}}    def _restore_group(self, group_id):        if group_id == 'exists':            return {'result': 'group'}    def _get_task_meta_for(self, task_id):        if task_id == 'task-exists':            return {'result': 'task'}    def _delete_group(self, group_id):        self._data.pop(group_id, None)class test_BaseBackend_dict:    def setup(self):        self.b = DictBackend(app=self.app)    def test_delete_group(self):        self.b.delete_group('can-delete')        assert 'can-delete' not in self.b._data    def test_prepare_exception_json(self):        x = DictBackend(self.app, serializer='json')        e = x.prepare_exception(KeyError('foo'))        assert 'exc_type' in e        e = x.exception_to_python(e)        assert e.__class__.__name__ == 'KeyError'        assert str(e).strip('u') == "'foo'"    def test_save_group(self):        b = BaseBackend(self.app)        b._save_group = Mock()        b.save_group('foofoo', 'xxx')        b._save_group.assert_called_with('foofoo', 'xxx')    def test_add_to_chord_interface(self):        b = BaseBackend(self.app)        with pytest.raises(NotImplementedError):            b.add_to_chord('group_id', 'sig')    def test_forget_interface(self):        b = BaseBackend(self.app)        with pytest.raises(NotImplementedError):            b.forget('foo')    def test_restore_group(self):        assert self.b.restore_group('missing') is None        assert self.b.restore_group('missing') is None        assert self.b.restore_group('exists') == 'group'        assert self.b.restore_group('exists') == 'group'        assert self.b.restore_group('exists', cache=False) == 'group'    def test_reload_group_result(self):        self.b._cache = {}        self.b.reload_group_result('exists')        self.b._cache['exists'] = {'result': 'group'}    def test_reload_task_result(self):        self.b._cache = {}        self.b.reload_task_result('task-exists')        self.b._cache['task-exists'] = {'result': 'task'}    def test_fail_from_current_stack(self):        self.b.mark_as_failure = Mock()        try:            raise KeyError('foo')        except KeyError as exc:            self.b.fail_from_current_stack('task_id')            self.b.mark_as_failure.assert_called()            args = self.b.mark_as_failure.call_args[0]            assert args[0] == 'task_id'            assert args[1] is exc            assert args[2]    def test_prepare_value_serializes_group_result(self):        self.b.serializer = 'json'        g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')])        v = self.b.prepare_value(g)        assert isinstance(v, (list, tuple))        assert result_from_tuple(v, app=self.app) == g        v2 = self.b.prepare_value(g[0])        assert isinstance(v2, (list, tuple))        assert result_from_tuple(v2, app=self.app) == g[0]        self.b.serializer = 'pickle'        assert isinstance(self.b.prepare_value(g), self.app.GroupResult)    def test_is_cached(self):        b = BaseBackend(app=self.app, max_cached_results=1)        b._cache['foo'] = 1        assert b.is_cached('foo')        assert not b.is_cached('false')    def test_mark_as_done__chord(self):        b = BaseBackend(app=self.app)        b._store_result = Mock()        request = Mock(name='request')        b.on_chord_part_return = Mock()        b.mark_as_done('id', 10, request=request)        b.on_chord_part_return.assert_called_with(request, states.SUCCESS, 10)    def test_mark_as_failure__chord(self):        b = BaseBackend(app=self.app)        b._store_result = Mock()        request = Mock(name='request')        request.errbacks = []        b.on_chord_part_return = Mock()        exc = KeyError()        b.mark_as_failure('id', exc, request=request)        b.on_chord_part_return.assert_called_with(request, states.FAILURE, exc)    def test_mark_as_revoked__chord(self):        b = BaseBackend(app=self.app)        b._store_result = Mock()        request = Mock(name='request')        request.errbacks = []        b.on_chord_part_return = Mock()        b.mark_as_revoked('id', 'revoked', request=request)        b.on_chord_part_return.assert_called_with(request, states.REVOKED, ANY)    def test_chord_error_from_stack_raises(self):        b = BaseBackend(app=self.app)        exc = KeyError()        callback = Mock(name='callback')        callback.options = {'link_error': []}        task = self.app.tasks[callback.task] = Mock()        b.fail_from_current_stack = Mock()        group = self.patching('celery.group')        group.side_effect = exc        b.chord_error_from_stack(callback, exc=ValueError())        task.backend.fail_from_current_stack.assert_called_with(            callback.id, exc=exc)    def test_exception_to_python_when_None(self):        b = BaseBackend(app=self.app)        assert b.exception_to_python(None) is None    def test_wait_for__on_interval(self):        self.patching('time.sleep')        b = BaseBackend(app=self.app)        b._get_task_meta_for = Mock()        b._get_task_meta_for.return_value = {'status': states.PENDING}        callback = Mock(name='callback')        with pytest.raises(TimeoutError):            b.wait_for(task_id='1', on_interval=callback, timeout=1)        callback.assert_called_with()        b._get_task_meta_for.return_value = {'status': states.SUCCESS}        b.wait_for(task_id='1', timeout=None)    def test_get_children(self):        b = BaseBackend(app=self.app)        b._get_task_meta_for = Mock()        b._get_task_meta_for.return_value = {}        assert b.get_children('id') is None        b._get_task_meta_for.return_value = {'children': 3}        assert b.get_children('id') == 3class test_KeyValueStoreBackend:    def setup(self):        self.b = KVBackend(app=self.app)    def test_on_chord_part_return(self):        assert not self.b.implements_incr        self.b.on_chord_part_return(None, None, None)    def test_get_store_delete_result(self):        tid = uuid()        self.b.mark_as_done(tid, 'Hello world')        assert self.b.get_result(tid) == 'Hello world'        assert self.b.get_state(tid) == states.SUCCESS        self.b.forget(tid)        assert self.b.get_state(tid) == states.PENDING    def test_strip_prefix(self):        x = self.b.get_key_for_task('x1b34')        assert self.b._strip_prefix(x) == 'x1b34'        assert self.b._strip_prefix('x1b34') == 'x1b34'    def test_get_many(self):        for is_dict in True, False:            self.b.mget_returns_dict = is_dict            ids = {uuid(): i for i in range(10)}            for id, i in items(ids):                self.b.mark_as_done(id, i)            it = self.b.get_many(list(ids), interval=0.01)            for i, (got_id, got_state) in enumerate(it):                assert got_state['result'] == ids[got_id]            assert i == 9            assert list(self.b.get_many(list(ids), interval=0.01))            self.b._cache.clear()            callback = Mock(name='callback')            it = self.b.get_many(                list(ids),                on_message=callback,                interval=0.05            )            for i, (got_id, got_state) in enumerate(it):                assert got_state['result'] == ids[got_id]            assert i == 9            assert list(                self.b.get_many(list(ids), interval=0.01)            )            callback.assert_has_calls([                call(ANY) for id in ids            ])    def test_get_many_times_out(self):        tasks = [uuid() for _ in range(4)]        self.b._cache[tasks[1]] = {'status': 'PENDING'}        with pytest.raises(self.b.TimeoutError):            list(self.b.get_many(tasks, timeout=0.01, interval=0.01))    def test_chord_part_return_no_gid(self):        self.b.implements_incr = True        task = Mock()        state = 'SUCCESS'        result = 10        task.request.group = None        self.b.get_key_for_chord = Mock()        self.b.get_key_for_chord.side_effect = AssertionError(            'should not get here',        )        assert self.b.on_chord_part_return(            task.request, state, result) is None    @patch('celery.backends.base.GroupResult')    @patch('celery.backends.base.maybe_signature')    def test_chord_part_return_restore_raises(self, maybe_signature,                                              GroupResult):        self.b.implements_incr = True        GroupResult.restore.side_effect = KeyError()        self.b.chord_error_from_stack = Mock()        callback = Mock(name='callback')        request = Mock(name='request')        request.group = 'gid'        maybe_signature.return_value = callback        self.b.on_chord_part_return(request, states.SUCCESS, 10)        self.b.chord_error_from_stack.assert_called_with(            callback, ANY,        )    @patch('celery.backends.base.GroupResult')    @patch('celery.backends.base.maybe_signature')    def test_chord_part_return_restore_empty(self, maybe_signature,                                             GroupResult):        self.b.implements_incr = True        GroupResult.restore.return_value = None        self.b.chord_error_from_stack = Mock()        callback = Mock(name='callback')        request = Mock(name='request')        request.group = 'gid'        maybe_signature.return_value = callback        self.b.on_chord_part_return(request, states.SUCCESS, 10)        self.b.chord_error_from_stack.assert_called_with(            callback, ANY,        )    def test_filter_ready(self):        self.b.decode_result = Mock()        self.b.decode_result.side_effect = pass1        assert len(list(self.b._filter_ready([            (1, {'status': states.RETRY}),            (2, {'status': states.FAILURE}),            (3, {'status': states.SUCCESS}),        ]))) == 2    @contextmanager    def _chord_part_context(self, b):        @self.app.task(shared=False)        def callback(result):            pass        b.implements_incr = True        b.client = Mock()        with patch('celery.backends.base.GroupResult') as GR:            deps = GR.restore.return_value = Mock(name='DEPS')            deps.__len__ = Mock()            deps.__len__.return_value = 10            b.incr = Mock()            b.incr.return_value = 10            b.expire = Mock()            task = Mock()            task.request.group = 'grid'            cb = task.request.chord = callback.s()            task.request.chord.freeze()            callback.backend = b            callback.backend.fail_from_current_stack = Mock()            yield task, deps, cb    def test_chord_part_return_propagate_set(self):        with self._chord_part_context(self.b) as (task, deps, _):            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)            self.b.expire.assert_not_called()            deps.delete.assert_called_with()            deps.join_native.assert_called_with(propagate=True, timeout=3.0)    def test_chord_part_return_propagate_default(self):        with self._chord_part_context(self.b) as (task, deps, _):            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)            self.b.expire.assert_not_called()            deps.delete.assert_called_with()            deps.join_native.assert_called_with(propagate=True, timeout=3.0)    def test_chord_part_return_join_raises_internal(self):        with self._chord_part_context(self.b) as (task, deps, callback):            deps._failed_join_report = lambda: iter([])            deps.join_native.side_effect = KeyError('foo')            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)            self.b.fail_from_current_stack.assert_called()            args = self.b.fail_from_current_stack.call_args            exc = args[1]['exc']            assert isinstance(exc, ChordError)            assert 'foo' in str(exc)    def test_chord_part_return_join_raises_task(self):        b = KVBackend(serializer='pickle', app=self.app)        with self._chord_part_context(b) as (task, deps, callback):            deps._failed_join_report = lambda: iter([                self.app.AsyncResult('culprit'),            ])            deps.join_native.side_effect = KeyError('foo')            b.on_chord_part_return(task.request, 'SUCCESS', 10)            b.fail_from_current_stack.assert_called()            args = b.fail_from_current_stack.call_args            exc = args[1]['exc']            assert isinstance(exc, ChordError)            assert 'Dependency culprit raised' in str(exc)    def test_restore_group_from_json(self):        b = KVBackend(serializer='json', app=self.app)        g = self.app.GroupResult(            'group_id',            [self.app.AsyncResult('a'), self.app.AsyncResult('b')],        )        b._save_group(g.id, g)        g2 = b._restore_group(g.id)['result']        assert g2 == g    def test_restore_group_from_pickle(self):        b = KVBackend(serializer='pickle', app=self.app)        g = self.app.GroupResult(            'group_id',            [self.app.AsyncResult('a'), self.app.AsyncResult('b')],        )        b._save_group(g.id, g)        g2 = b._restore_group(g.id)['result']        assert g2 == g    def test_chord_apply_fallback(self):        self.b.implements_incr = False        self.b.fallback_chord_unlock = Mock()        self.b.apply_chord(            group(app=self.app), (), 'group_id', 'body',            result='result', foo=1,        )        self.b.fallback_chord_unlock.assert_called_with(            'group_id', 'body', result='result', foo=1,        )    def test_get_missing_meta(self):        assert self.b.get_result('xxx-missing') is None        assert self.b.get_state('xxx-missing') == states.PENDING    def test_save_restore_delete_group(self):        tid = uuid()        tsr = self.app.GroupResult(            tid, [self.app.AsyncResult(uuid()) for _ in range(10)],        )        self.b.save_group(tid, tsr)        self.b.restore_group(tid)        assert self.b.restore_group(tid) == tsr        self.b.delete_group(tid)        assert self.b.restore_group(tid) is None    def test_restore_missing_group(self):        assert self.b.restore_group('xxx-nonexistant') is Noneclass test_KeyValueStoreBackend_interface:    def test_get(self):        with pytest.raises(NotImplementedError):            KeyValueStoreBackend(self.app).get('a')    def test_set(self):        with pytest.raises(NotImplementedError):            KeyValueStoreBackend(self.app).set('a', 1)    def test_incr(self):        with pytest.raises(NotImplementedError):            KeyValueStoreBackend(self.app).incr('a')    def test_cleanup(self):        assert not KeyValueStoreBackend(self.app).cleanup()    def test_delete(self):        with pytest.raises(NotImplementedError):            KeyValueStoreBackend(self.app).delete('a')    def test_mget(self):        with pytest.raises(NotImplementedError):            KeyValueStoreBackend(self.app).mget(['a'])    def test_forget(self):        with pytest.raises(NotImplementedError):            KeyValueStoreBackend(self.app).forget('a')class test_DisabledBackend:    def test_store_result(self):        DisabledBackend(self.app).store_result()    def test_is_disabled(self):        with pytest.raises(NotImplementedError):            DisabledBackend(self.app).get_state('foo')    def test_as_uri(self):        assert DisabledBackend(self.app).as_uri() == 'disabled://'    @pytest.mark.celery(result_backend='disabled')    def test_chord_raises_error(self):        with pytest.raises(NotImplementedError):            chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))    @pytest.mark.celery(result_backend='disabled')    def test_chain_with_chord_raises_error(self):        with pytest.raises(NotImplementedError):            (self.add.s(2, 2) |             group(self.add.s(2, 2),                   self.add.s(5, 6)) | self.add.s()).delay()class test_as_uri:    def setup(self):        self.b = BaseBackend(            app=self.app,            url='sch://uuuu:pwpw@hostname.dom'        )    def test_as_uri_include_password(self):        assert self.b.as_uri(True) == self.b.url    def test_as_uri_exclude_password(self):        assert self.b.as_uri() == 'sch://uuuu:**@hostname.dom/'
 |