| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 | from __future__ import absolute_import, unicode_literalsimport gcimport sysimport timefrom celery.utils.dispatch import Signalif sys.platform.startswith('java'):    def garbage_collect():        # Some JVM GCs will execute finalizers in a different thread, meaning        # we need to wait for that to complete before we go on looking for the        # effects of that.        gc.collect()        time.sleep(0.1)elif hasattr(sys, 'pypy_version_info'):    def garbage_collect():  # noqa        # Collecting weakreferences can take two collections on PyPy.        gc.collect()        gc.collect()else:    def garbage_collect():  # noqa        gc.collect()def receiver_1_arg(val, **kwargs):    return valclass Callable(object):    def __call__(self, val, **kwargs):        return val    def a(self, val, **kwargs):        return vala_signal = Signal(providing_args=['val'], use_caching=False)class test_Signal:    """Test suite for dispatcher (barely started)"""    def _testIsClean(self, signal):        """Assert that everything has been cleaned up automatically"""        assert not signal.has_listeners()        assert signal.receivers == []    def test_exact(self):        a_signal.connect(receiver_1_arg, sender=self)        try:            expected = [(receiver_1_arg, 'test')]            result = a_signal.send(sender=self, val='test')            assert result == expected        finally:            a_signal.disconnect(receiver_1_arg, sender=self)        self._testIsClean(a_signal)    def test_ignored_sender(self):        a_signal.connect(receiver_1_arg)        try:            expected = [(receiver_1_arg, 'test')]            result = a_signal.send(sender=self, val='test')            assert result == expected        finally:            a_signal.disconnect(receiver_1_arg)        self._testIsClean(a_signal)    def test_garbage_collected(self):        a = Callable()        a_signal.connect(a.a, sender=self)        expected = []        del a        garbage_collect()        result = a_signal.send(sender=self, val='test')        assert result == expected        self._testIsClean(a_signal)    def test_multiple_registration(self):        a = Callable()        result = None        try:            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            result = a_signal.send(sender=self, val='test')            assert len(result) == 1            assert len(a_signal.receivers) == 1        finally:            del a            del result            garbage_collect()            self._testIsClean(a_signal)    def test_uid_registration(self):        def uid_based_receiver_1(**kwargs):            pass        def uid_based_receiver_2(**kwargs):            pass        a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')        try:            a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')            assert len(a_signal.receivers) == 1        finally:            a_signal.disconnect(dispatch_uid='uid')        self._testIsClean(a_signal)    def test_robust(self):        def fails(val, **kwargs):            raise ValueError('this')        a_signal.connect(fails)        try:            a_signal.send(sender=self, val='test')        finally:            a_signal.disconnect(fails)        self._testIsClean(a_signal)    def test_disconnection(self):        receiver_1 = Callable()        receiver_2 = Callable()        receiver_3 = Callable()        try:            try:                a_signal.connect(receiver_1)                a_signal.connect(receiver_2)                a_signal.connect(receiver_3)            finally:                a_signal.disconnect(receiver_1)            del receiver_2            garbage_collect()        finally:            a_signal.disconnect(receiver_3)        self._testIsClean(a_signal)    def test_retry(self):        class non_local:            counter = 1        def succeeds_eventually(val, **kwargs):            non_local.counter += 1            if non_local.counter < 3:                raise ValueError('this')            return val        a_signal.connect(succeeds_eventually, sender=self, retry=True)        try:            result = a_signal.send(sender=self, val='test')            assert non_local.counter == 3            assert result[0][1] == 'test'        finally:            a_signal.disconnect(succeeds_eventually, sender=self)        self._testIsClean(a_signal)    def test_retry_with_dispatch_uid(self):        uid = 'abc123'        a_signal.connect(receiver_1_arg, sender=self, retry=True,                         dispatch_uid=uid)        assert a_signal.receivers[0][0][0] == uid        a_signal.disconnect(receiver_1_arg, sender=self, dispatch_uid=uid)        self._testIsClean(a_signal)    def test_boundmethod(self):        a = Callable()        a_signal.connect(a.a, sender=self)        expected = [(a.a, 'test')]        garbage_collect()        result = a_signal.send(sender=self, val='test')        assert result == expected        del a, result, expected        garbage_collect()        self._testIsClean(a_signal)
 |