| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 | from __future__ import absolute_import, unicode_literalsimport pytestfrom case import Mock, patchfrom celery import chord, groupfrom celery.backends.rpc import RPCBackendfrom celery._state import _task_stackclass test_RPCBackend:    def setup(self):        self.b = RPCBackend(app=self.app)    def test_oid(self):        oid = self.b.oid        oid2 = self.b.oid        assert oid == oid2        assert oid == self.app.oid    def test_interface(self):        self.b.on_reply_declare('task_id')    def test_ensure_chords_allowed(self):        with pytest.raises(NotImplementedError):            self.b.ensure_chords_allowed()    def test_apply_chord(self):        with pytest.raises(NotImplementedError):            self.b.apply_chord([], (), 'gid', Mock(name='body'))    @pytest.mark.celery(result_backend='rpc')    def test_chord_raises_error(self):        with pytest.raises(NotImplementedError):            chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))    @pytest.mark.celery(result_backend='rpc')    def test_chain_with_chord_raises_error(self):        with pytest.raises(NotImplementedError):            (self.add.s(2, 2) |             group(self.add.s(2, 2),                   self.add.s(5, 6)) | self.add.s()).delay()    def test_destination_for(self):        req = Mock(name='request')        req.reply_to = 'reply_to'        req.correlation_id = 'corid'        assert self.b.destination_for('task_id', req) == ('reply_to', 'corid')        task = Mock()        _task_stack.push(task)        try:            task.request.reply_to = 'reply_to'            task.request.correlation_id = 'corid'            assert self.b.destination_for('task_id', None) == (                'reply_to', 'corid',            )        finally:            _task_stack.pop()        with pytest.raises(RuntimeError):            self.b.destination_for('task_id', None)    def test_binding(self):        queue = self.b.binding        assert queue.name == self.b.oid        assert queue.exchange == self.b.exchange        assert queue.routing_key == self.b.oid        assert not queue.durable        assert queue.auto_delete    def test_create_binding(self):        assert self.b._create_binding('id') == self.b.binding    def test_on_task_call(self):        with patch('celery.backends.rpc.maybe_declare') as md:            with self.app.amqp.producer_pool.acquire() as prod:                self.b.on_task_call(prod, 'task_id'),                md.assert_called_with(                    self.b.binding(prod.channel),                    retry=True,                )    def test_create_exchange(self):        ex = self.b._create_exchange('name')        assert isinstance(ex, self.b.Exchange)        assert ex.name == ''
 |