| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 | from __future__ import absolute_import, unicode_literalsimport picklefrom contextlib import contextmanagerfrom datetime import timedeltafrom pickle import dumps, loadsimport pytestfrom billiard.einfo import ExceptionInfofrom case import Mock, mockfrom celery import states, uuidfrom celery.backends.amqp import AMQPBackendfrom celery.five import Empty, Queue, rangefrom celery.result import AsyncResultclass SomeClass(object):    def __init__(self, data):        self.data = dataclass test_AMQPBackend:    def setup(self):        self.app.conf.result_cache_max = 100    def create_backend(self, **opts):        opts = dict({'serializer': 'pickle', 'persistent': True}, **opts)        return AMQPBackend(self.app, **opts)    def test_destination_for(self):        b = self.create_backend()        request = Mock()        assert b.destination_for('id', request) == (            b.rkey('id'), request.correlation_id,        )    def test_store_result__no_routing_key(self):        b = self.create_backend()        b.destination_for = Mock()        b.destination_for.return_value = None, None        b.store_result('id', None, states.SUCCESS)    def test_mark_as_done(self):        tb1 = self.create_backend(max_cached_results=1)        tb2 = self.create_backend(max_cached_results=1)        tid = uuid()        tb1.mark_as_done(tid, 42)        assert tb2.get_state(tid) == states.SUCCESS        assert tb2.get_result(tid) == 42        assert tb2._cache.get(tid)        assert tb2.get_result(tid), 42    @pytest.mark.usefixtures('depends_on_current_app')    def test_pickleable(self):        assert loads(dumps(self.create_backend()))    def test_revive(self):        tb = self.create_backend()        tb.revive(None)    def test_is_pickled(self):        tb1 = self.create_backend()        tb2 = self.create_backend()        tid2 = uuid()        result = {'foo': 'baz', 'bar': SomeClass(12345)}        tb1.mark_as_done(tid2, result)        # is serialized properly.        rindb = tb2.get_result(tid2)        assert rindb.get('foo') == 'baz'        assert rindb.get('bar').data == 12345    def test_mark_as_failure(self):        tb1 = self.create_backend()        tb2 = self.create_backend()        tid3 = uuid()        try:            raise KeyError('foo')        except KeyError as exception:            einfo = ExceptionInfo()            tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)            assert tb2.get_state(tid3) == states.FAILURE            assert isinstance(tb2.get_result(tid3), KeyError)            assert tb2.get_traceback(tid3) == einfo.traceback    def test_repair_uuid(self):        from celery.backends.amqp import repair_uuid        for i in range(10):            tid = uuid()            assert repair_uuid(tid.replace('-', '')) == tid    def test_expires_is_int(self):        b = self.create_backend(expires=48)        q = b._create_binding('x1y2z3')        assert q.expires == 48    def test_expires_is_float(self):        b = self.create_backend(expires=48.3)        q = b._create_binding('x1y2z3')        assert q.expires == 48.3    def test_expires_is_timedelta(self):        b = self.create_backend(expires=timedelta(minutes=1))        q = b._create_binding('x1y2z3')        assert q.expires == 60    @mock.sleepdeprived()    def test_store_result_retries(self):        iterations = [0]        stop_raising_at = [5]        def publish(*args, **kwargs):            if iterations[0] > stop_raising_at[0]:                return            iterations[0] += 1            raise KeyError('foo')        backend = AMQPBackend(self.app)        from celery.app.amqp import Producer        prod, Producer.publish = Producer.publish, publish        try:            with pytest.raises(KeyError):                backend.retry_policy['max_retries'] = None                backend.store_result('foo', 'bar', 'STARTED')            with pytest.raises(KeyError):                backend.retry_policy['max_retries'] = 10                backend.store_result('foo', 'bar', 'STARTED')        finally:            Producer.publish = prod    def test_poll_no_messages(self):        b = self.create_backend()        assert b.get_task_meta(uuid())['status'] == states.PENDING    @contextmanager    def _result_context(self):        results = Queue()        class Message(object):            acked = 0            requeued = 0            def __init__(self, **merge):                self.payload = dict({'status': states.STARTED,                                     'result': None}, **merge)                self.properties = {'correlation_id': merge.get('task_id')}                self.body = pickle.dumps(self.payload)                self.content_type = 'application/x-python-serialize'                self.content_encoding = 'binary'            def ack(self, *args, **kwargs):                self.acked += 1            def requeue(self, *args, **kwargs):                self.requeued += 1        class MockBinding(object):            def __init__(self, *args, **kwargs):                self.channel = Mock()            def __call__(self, *args, **kwargs):                return self            def declare(self):                pass            def get(self, no_ack=False, accept=None):                try:                    m = results.get(block=False)                    if m:                        m.accept = accept                    return m                except Empty:                    pass            def is_bound(self):                return True        class MockBackend(AMQPBackend):            Queue = MockBinding        backend = MockBackend(self.app, max_cached_results=100)        backend._republish = Mock()        yield results, backend, Message    def test_backlog_limit_exceeded(self):        with self._result_context() as (results, backend, Message):            for i in range(1001):                results.put(Message(task_id='id', status=states.RECEIVED))            with pytest.raises(backend.BacklogLimitExceeded):                backend.get_task_meta('id')    def test_poll_result(self):        with self._result_context() as (results, backend, Message):            tid = uuid()            # FFWD's to the latest state.            state_messages = [                Message(task_id=tid, status=states.RECEIVED, seq=1),                Message(task_id=tid, status=states.STARTED, seq=2),                Message(task_id=tid, status=states.FAILURE, seq=3),            ]            for state_message in state_messages:                results.put(state_message)            r1 = backend.get_task_meta(tid)            # FFWDs to the last state.            assert r1['status'] == states.FAILURE            assert r1['seq'] == 3            # Caches last known state.            tid = uuid()            results.put(Message(task_id=tid))            backend.get_task_meta(tid)            assert tid, backend._cache in 'Caches last known state'            assert state_messages[-1].requeued            # Returns cache if no new states.            results.queue.clear()            assert not results.qsize()            backend._cache[tid] = 'hello'            # returns cache if no new states.            assert backend.get_task_meta(tid) == 'hello'    def test_drain_events_decodes_exceptions_in_meta(self):        tid = uuid()        b = self.create_backend(serializer='json')        b.store_result(tid, RuntimeError('aap'), states.FAILURE)        result = AsyncResult(tid, backend=b)        with pytest.raises(Exception) as excinfo:            result.get()        assert excinfo.value.__class__.__name__ == 'RuntimeError'        assert str(excinfo.value) == 'aap'    def test_no_expires(self):        b = self.create_backend(expires=None)        app = self.app        app.conf.result_expires = None        b = self.create_backend(expires=None)        q = b._create_binding('foo')        assert q.expires is None    def test_process_cleanup(self):        self.create_backend().process_cleanup()    def test_reload_task_result(self):        with pytest.raises(NotImplementedError):            self.create_backend().reload_task_result('x')    def test_reload_group_result(self):        with pytest.raises(NotImplementedError):            self.create_backend().reload_group_result('x')    def test_save_group(self):        with pytest.raises(NotImplementedError):            self.create_backend().save_group('x', 'x')    def test_restore_group(self):        with pytest.raises(NotImplementedError):            self.create_backend().restore_group('x')    def test_delete_group(self):        with pytest.raises(NotImplementedError):            self.create_backend().delete_group('x')
 |