| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 | from __future__ import absolute_import, unicode_literalsimport gcimport sysimport timefrom celery.utils.dispatch import Signalif sys.platform.startswith('java'):    def garbage_collect():        # Some JVM GCs will execute finalizers in a different thread, meaning        # we need to wait for that to complete before we go on looking for the        # effects of that.        gc.collect()        time.sleep(0.1)elif hasattr(sys, 'pypy_version_info'):    def garbage_collect():  # noqa        # Collecting weakreferences can take two collections on PyPy.        gc.collect()        gc.collect()else:    def garbage_collect():  # noqa        gc.collect()def receiver_1_arg(val, **kwargs):    return valclass Callable(object):    def __call__(self, val, **kwargs):        return val    def a(self, val, **kwargs):        return vala_signal = Signal(providing_args=['val'], use_caching=False)class test_Signal:    """Test suite for dispatcher (barely started)"""    def _testIsClean(self, signal):        """Assert that everything has been cleaned up automatically"""        assert not signal.has_listeners()        assert signal.receivers == []    def test_exact(self):        a_signal.connect(receiver_1_arg, sender=self)        try:            expected = [(receiver_1_arg, 'test')]            result = a_signal.send(sender=self, val='test')            assert result == expected        finally:            a_signal.disconnect(receiver_1_arg, sender=self)        self._testIsClean(a_signal)    def test_ignored_sender(self):        a_signal.connect(receiver_1_arg)        try:            expected = [(receiver_1_arg, 'test')]            result = a_signal.send(sender=self, val='test')            assert result == expected        finally:            a_signal.disconnect(receiver_1_arg)        self._testIsClean(a_signal)    def test_garbage_collected(self):        a = Callable()        a_signal.connect(a.a, sender=self)        expected = []        del a        garbage_collect()        result = a_signal.send(sender=self, val='test')        assert result == expected        self._testIsClean(a_signal)    def test_multiple_registration(self):        a = Callable()        result = None        try:            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            a_signal.connect(a)            result = a_signal.send(sender=self, val='test')            assert len(result) == 1            assert len(a_signal.receivers) == 1        finally:            del a            del result            garbage_collect()            self._testIsClean(a_signal)    def test_uid_registration(self):        def uid_based_receiver_1(**kwargs):            pass        def uid_based_receiver_2(**kwargs):            pass        a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')        try:            a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')            assert len(a_signal.receivers) == 1        finally:            a_signal.disconnect(dispatch_uid='uid')        self._testIsClean(a_signal)    def test_robust(self):        def fails(val, **kwargs):            raise ValueError('this')        a_signal.connect(fails)        try:            a_signal.send(sender=self, val='test')        finally:            a_signal.disconnect(fails)        self._testIsClean(a_signal)    def test_disconnection(self):        receiver_1 = Callable()        receiver_2 = Callable()        receiver_3 = Callable()        try:            try:                a_signal.connect(receiver_1)                a_signal.connect(receiver_2)                a_signal.connect(receiver_3)            finally:                a_signal.disconnect(receiver_1)            del receiver_2            garbage_collect()        finally:            a_signal.disconnect(receiver_3)        self._testIsClean(a_signal)
 |