| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 | from __future__ import absolute_import, unicode_literalsimport pytestfrom datetime import datetime, timedeltafrom case import Mockfrom kombu import Exchange, Queuefrom celery import uuidfrom celery.app.amqp import Queues, utf8dictfrom celery.five import keysfrom celery.utils.time import to_utcclass test_TaskConsumer:    def test_accept_content(self, app):        with app.pool.acquire(block=True) as con:            app.conf.accept_content = ['application/json']            assert app.amqp.TaskConsumer(con).accept == {                'application/json',            }            assert app.amqp.TaskConsumer(con, accept=['json']).accept == {                'application/json',            }class test_ProducerPool:    def test_setup_nolimit(self, app):        app.conf.broker_pool_limit = None        try:            delattr(app, '_pool')        except AttributeError:            pass        app.amqp._producer_pool = None        pool = app.amqp.producer_pool        assert pool.limit == app.pool.limit        assert not pool._resource.queue        r1 = pool.acquire()        r2 = pool.acquire()        r1.release()        r2.release()        r1 = pool.acquire()        r2 = pool.acquire()    def test_setup(self, app):        app.conf.broker_pool_limit = 2        try:            delattr(app, '_pool')        except AttributeError:            pass        app.amqp._producer_pool = None        pool = app.amqp.producer_pool        assert pool.limit == app.pool.limit        assert pool._resource.queue        p1 = r1 = pool.acquire()        p2 = r2 = pool.acquire()        r1.release()        r2.release()        r1 = pool.acquire()        r2 = pool.acquire()        assert p2 is r1        assert p1 is r2        r1.release()        r2.release()class test_Queues:    def test_queues_format(self):        self.app.amqp.queues._consume_from = {}        assert self.app.amqp.queues.format() == ''    def test_with_defaults(self):        assert Queues(None) == {}    def test_add(self):        q = Queues()        q.add('foo', exchange='ex', routing_key='rk')        assert 'foo' in q        assert isinstance(q['foo'], Queue)        assert q['foo'].routing_key == 'rk'    def test_setitem_adds_default_exchange(self):        q = Queues(default_exchange=Exchange('bar'))        assert q.default_exchange        queue = Queue('foo', exchange=None)        queue.exchange = None        q['foo'] = queue        assert q['foo'].exchange == q.default_exchange    @pytest.mark.parametrize('ha_policy,qname,q,qargs,expected', [        (None, 'xyz', 'xyz', None, None),        (None, 'xyz', 'xyz', {'x-foo': 'bar'}, {'x-foo': 'bar'}),        ('all', 'foo', Queue('foo'), None, {'x-ha-policy': 'all'}),        ('all', 'xyx2',         Queue('xyx2', queue_arguments={'x-foo': 'bari'}),         None,         {'x-ha-policy': 'all', 'x-foo': 'bari'}),        (['A', 'B', 'C'], 'foo', Queue('foo'), None, {            'x-ha-policy': 'nodes',            'x-ha-policy-params': ['A', 'B', 'C']}),    ])    def test_with_ha_policy(self, ha_policy, qname, q, qargs, expected):        queues = Queues(ha_policy=ha_policy, create_missing=False)        queues.add(q, queue_arguments=qargs)        assert queues[qname].queue_arguments == expected    def test_select_add(self):        q = Queues()        q.select(['foo', 'bar'])        q.select_add('baz')        assert sorted(keys(q._consume_from)) == ['bar', 'baz', 'foo']    def test_deselect(self):        q = Queues()        q.select(['foo', 'bar'])        q.deselect('bar')        assert sorted(keys(q._consume_from)) == ['foo']    def test_with_ha_policy_compat(self):        q = Queues(ha_policy='all')        q.add('bar')        assert q['bar'].queue_arguments == {'x-ha-policy': 'all'}    def test_add_default_exchange(self):        ex = Exchange('fff', 'fanout')        q = Queues(default_exchange=ex)        q.add(Queue('foo'))        assert q['foo'].exchange.name == 'fff'    def test_alias(self):        q = Queues()        q.add(Queue('foo', alias='barfoo'))        assert q['barfoo'] is q['foo']    @pytest.mark.parametrize('queues_kwargs,qname,q,expected', [        ({'max_priority': 10},         'foo', 'foo', {'x-max-priority': 10}),        ({'max_priority': 10},         'xyz', Queue('xyz', queue_arguments={'x-max-priority': 3}),         {'x-max-priority': 3}),        ({'max_priority': 10},         'moo', Queue('moo', queue_arguments=None),         {'x-max-priority': 10}),        ({'ha_policy': 'all', 'max_priority': 5},         'bar', 'bar',         {'x-ha-policy': 'all', 'x-max-priority': 5}),        ({'ha_policy': 'all', 'max_priority': 5},         'xyx2', Queue('xyx2', queue_arguments={'x-max-priority': 2}),         {'x-ha-policy': 'all', 'x-max-priority': 2}),        ({'max_priority': None},         'foo2', 'foo2',         None),        ({'max_priority': None},         'xyx3', Queue('xyx3', queue_arguments={'x-max-priority': 7}),         {'x-max-priority': 7}),    ])    def test_with_max_priority(self, queues_kwargs, qname, q, expected):        queues = Queues(**queues_kwargs)        queues.add(q)        assert queues[qname].queue_arguments == expectedclass test_default_queues:    @pytest.mark.parametrize('name,exchange,rkey', [        ('default', None, None),        ('default', 'exchange', None),        ('default', 'exchange', 'routing_key'),        ('default', None, 'routing_key'),    ])    def test_setting_default_queue(self, name, exchange, rkey):        self.app.conf.task_queues = {}        self.app.conf.task_default_exchange = exchange        self.app.conf.task_default_routing_key = rkey        self.app.conf.task_default_queue = name        assert self.app.amqp.queues.default_exchange.name == exchange or name        queues = dict(self.app.amqp.queues)        assert len(queues) == 1        queue = queues[name]        assert queue.exchange.name == exchange or name        assert queue.exchange.type == 'direct'        assert queue.routing_key == rkey or nameclass test_AMQP_proto1:    def test_kwargs_must_be_mapping(self):        with pytest.raises(TypeError):            self.app.amqp.as_task_v1(uuid(), 'foo', kwargs=[1, 2])    def test_args_must_be_list(self):        with pytest.raises(TypeError):            self.app.amqp.as_task_v1(uuid(), 'foo', args='abc')    def test_countdown_negative(self):        with pytest.raises(ValueError):            self.app.amqp.as_task_v1(uuid(), 'foo', countdown=-1232132323123)    def test_as_task_message_without_utc(self):        self.app.amqp.utc = False        self.app.amqp.as_task_v1(uuid(), 'foo', countdown=30, expires=40)class test_AMQP:    def setup(self):        self.simple_message = self.app.amqp.as_task_v2(            uuid(), 'foo', create_sent_event=True,        )        self.simple_message_no_sent_event = self.app.amqp.as_task_v2(            uuid(), 'foo', create_sent_event=False,        )    def test_kwargs_must_be_mapping(self):        with pytest.raises(TypeError):            self.app.amqp.as_task_v2(uuid(), 'foo', kwargs=[1, 2])    def test_args_must_be_list(self):        with pytest.raises(TypeError):            self.app.amqp.as_task_v2(uuid(), 'foo', args='abc')    def test_countdown_negative(self):        with pytest.raises(ValueError):            self.app.amqp.as_task_v2(uuid(), 'foo', countdown=-1232132323123)    def test_Queues__with_ha_policy(self):        x = self.app.amqp.Queues({}, ha_policy='all')        assert x.ha_policy == 'all'    def test_Queues__with_max_priority(self):        x = self.app.amqp.Queues({}, max_priority=23)        assert x.max_priority == 23    def test_send_task_message__no_kwargs(self):        self.app.amqp.send_task_message(Mock(), 'foo', self.simple_message)    def test_send_task_message__properties(self):        prod = Mock(name='producer')        self.app.amqp.send_task_message(            prod, 'foo', self.simple_message_no_sent_event,            foo=1, retry=False,        )        assert prod.publish.call_args[1]['foo'] == 1    def test_send_task_message__headers(self):        prod = Mock(name='producer')        self.app.amqp.send_task_message(            prod, 'foo', self.simple_message_no_sent_event,            headers={'x1x': 'y2x'},            retry=False,        )        assert prod.publish.call_args[1]['headers']['x1x'] == 'y2x'    def test_send_task_message__queue_string(self):        prod = Mock(name='producer')        self.app.amqp.send_task_message(            prod, 'foo', self.simple_message_no_sent_event,            queue='foo', retry=False,        )        kwargs = prod.publish.call_args[1]        assert kwargs['routing_key'] == 'foo'        assert kwargs['exchange'] == ''    def test_send_task_message__broadcast_without_exchange(self):        from kombu.common import Broadcast        evd = Mock(name='evd')        self.app.amqp.send_task_message(            Mock(), 'foo', self.simple_message, retry=False,            routing_key='xyz', queue=Broadcast('abc'),            event_dispatcher=evd,        )        evd.publish.assert_called()        event = evd.publish.call_args[0][1]        assert event['routing_key'] == 'xyz'        assert event['exchange'] == 'abc'    def test_send_event_exchange_direct_with_exchange(self):        prod = Mock(name='prod')        self.app.amqp.send_task_message(            prod, 'foo', self.simple_message_no_sent_event, queue='bar',            retry=False, exchange_type='direct', exchange='xyz',        )        prod.publish.assert_called()        pub = prod.publish.call_args[1]        assert pub['routing_key'] == 'bar'        assert pub['exchange'] == ''    def test_send_event_exchange_direct_with_routing_key(self):        prod = Mock(name='prod')        self.app.amqp.send_task_message(            prod, 'foo', self.simple_message_no_sent_event, queue='bar',            retry=False, exchange_type='direct', routing_key='xyb',        )        prod.publish.assert_called()        pub = prod.publish.call_args[1]        assert pub['routing_key'] == 'bar'        assert pub['exchange'] == ''    def test_send_event_exchange_string(self):        evd = Mock(name='evd')        self.app.amqp.send_task_message(            Mock(), 'foo', self.simple_message, retry=False,            exchange='xyz', routing_key='xyb',            event_dispatcher=evd,        )        evd.publish.assert_called()        event = evd.publish.call_args[0][1]        assert event['routing_key'] == 'xyb'        assert event['exchange'] == 'xyz'    def test_send_task_message__with_delivery_mode(self):        prod = Mock(name='producer')        self.app.amqp.send_task_message(            prod, 'foo', self.simple_message_no_sent_event,            delivery_mode=33, retry=False,        )        assert prod.publish.call_args[1]['delivery_mode'] == 33    def test_send_task_message__with_receivers(self):        from case import patch        mocked_receiver = ((Mock(), Mock()), Mock())        with patch('celery.signals.task_sent.receivers', [mocked_receiver]):            self.app.amqp.send_task_message(Mock(), 'foo', self.simple_message)    def test_routes(self):        r1 = self.app.amqp.routes        r2 = self.app.amqp.routes        assert r1 is r2class test_as_task_v2:    def test_raises_if_args_is_not_tuple(self):        with pytest.raises(TypeError):            self.app.amqp.as_task_v2(uuid(), 'foo', args='123')    def test_raises_if_kwargs_is_not_mapping(self):        with pytest.raises(TypeError):            self.app.amqp.as_task_v2(uuid(), 'foo', kwargs=(1, 2, 3))    def test_countdown_to_eta(self):        now = to_utc(datetime.utcnow()).astimezone(self.app.timezone)        m = self.app.amqp.as_task_v2(            uuid(), 'foo', countdown=10, now=now,        )        assert m.headers['eta'] == (now + timedelta(seconds=10)).isoformat()    def test_expires_to_datetime(self):        now = to_utc(datetime.utcnow()).astimezone(self.app.timezone)        m = self.app.amqp.as_task_v2(            uuid(), 'foo', expires=30, now=now,        )        assert m.headers['expires'] == (            now + timedelta(seconds=30)).isoformat()    def test_callbacks_errbacks_chord(self):        @self.app.task        def t(i):            pass        m = self.app.amqp.as_task_v2(            uuid(), 'foo',            callbacks=[t.s(1), t.s(2)],            errbacks=[t.s(3), t.s(4)],            chord=t.s(5),        )        _, _, embed = m.body        assert embed['callbacks'] == [utf8dict(t.s(1)), utf8dict(t.s(2))]        assert embed['errbacks'] == [utf8dict(t.s(3)), utf8dict(t.s(4))]        assert embed['chord'] == utf8dict(t.s(5))
 |