| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 | 
							- from __future__ import absolute_import, unicode_literals
 
- import gc
 
- import sys
 
- import time
 
- from celery.utils.dispatch import Signal
 
- if sys.platform.startswith('java'):
 
-     def garbage_collect():
 
-         # Some JVM GCs will execute finalizers in a different thread, meaning
 
-         # we need to wait for that to complete before we go on looking for the
 
-         # effects of that.
 
-         gc.collect()
 
-         time.sleep(0.1)
 
- elif hasattr(sys, 'pypy_version_info'):
 
-     def garbage_collect():  # noqa
 
-         # Collecting weakreferences can take two collections on PyPy.
 
-         gc.collect()
 
-         gc.collect()
 
- else:
 
-     def garbage_collect():  # noqa
 
-         gc.collect()
 
- def receiver_1_arg(val, **kwargs):
 
-     return val
 
- class Callable(object):
 
-     def __call__(self, val, **kwargs):
 
-         return val
 
-     def a(self, val, **kwargs):
 
-         return val
 
- a_signal = Signal(providing_args=['val'], use_caching=False)
 
- class test_Signal:
 
-     """Test suite for dispatcher (barely started)"""
 
-     def _testIsClean(self, signal):
 
-         """Assert that everything has been cleaned up automatically"""
 
-         assert not signal.has_listeners()
 
-         assert signal.receivers == []
 
-     def test_exact(self):
 
-         a_signal.connect(receiver_1_arg, sender=self)
 
-         try:
 
-             expected = [(receiver_1_arg, 'test')]
 
-             result = a_signal.send(sender=self, val='test')
 
-             assert result == expected
 
-         finally:
 
-             a_signal.disconnect(receiver_1_arg, sender=self)
 
-         self._testIsClean(a_signal)
 
-     def test_ignored_sender(self):
 
-         a_signal.connect(receiver_1_arg)
 
-         try:
 
-             expected = [(receiver_1_arg, 'test')]
 
-             result = a_signal.send(sender=self, val='test')
 
-             assert result == expected
 
-         finally:
 
-             a_signal.disconnect(receiver_1_arg)
 
-         self._testIsClean(a_signal)
 
-     def test_garbage_collected(self):
 
-         a = Callable()
 
-         a_signal.connect(a.a, sender=self)
 
-         expected = []
 
-         del a
 
-         garbage_collect()
 
-         result = a_signal.send(sender=self, val='test')
 
-         assert result == expected
 
-         self._testIsClean(a_signal)
 
-     def test_multiple_registration(self):
 
-         a = Callable()
 
-         result = None
 
-         try:
 
-             a_signal.connect(a)
 
-             a_signal.connect(a)
 
-             a_signal.connect(a)
 
-             a_signal.connect(a)
 
-             a_signal.connect(a)
 
-             a_signal.connect(a)
 
-             result = a_signal.send(sender=self, val='test')
 
-             assert len(result) == 1
 
-             assert len(a_signal.receivers) == 1
 
-         finally:
 
-             del a
 
-             del result
 
-             garbage_collect()
 
-             self._testIsClean(a_signal)
 
-     def test_uid_registration(self):
 
-         def uid_based_receiver_1(**kwargs):
 
-             pass
 
-         def uid_based_receiver_2(**kwargs):
 
-             pass
 
-         a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
 
-         try:
 
-             a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
 
-             assert len(a_signal.receivers) == 1
 
-         finally:
 
-             a_signal.disconnect(dispatch_uid='uid')
 
-         self._testIsClean(a_signal)
 
-     def test_robust(self):
 
-         def fails(val, **kwargs):
 
-             raise ValueError('this')
 
-         a_signal.connect(fails)
 
-         try:
 
-             a_signal.send(sender=self, val='test')
 
-         finally:
 
-             a_signal.disconnect(fails)
 
-         self._testIsClean(a_signal)
 
-     def test_disconnection(self):
 
-         receiver_1 = Callable()
 
-         receiver_2 = Callable()
 
-         receiver_3 = Callable()
 
-         try:
 
-             try:
 
-                 a_signal.connect(receiver_1)
 
-                 a_signal.connect(receiver_2)
 
-                 a_signal.connect(receiver_3)
 
-             finally:
 
-                 a_signal.disconnect(receiver_1)
 
-             del receiver_2
 
-             garbage_collect()
 
-         finally:
 
-             a_signal.disconnect(receiver_3)
 
-         self._testIsClean(a_signal)
 
-     def test_retry(self):
 
-         class non_local:
 
-             counter = 1
 
-         def succeeds_eventually(val, **kwargs):
 
-             non_local.counter += 1
 
-             if non_local.counter < 3:
 
-                 raise ValueError('this')
 
-             return val
 
-         a_signal.connect(succeeds_eventually, sender=self, retry=True)
 
-         try:
 
-             result = a_signal.send(sender=self, val='test')
 
-             assert non_local.counter == 3
 
-             assert result[0][1] == 'test'
 
-         finally:
 
-             a_signal.disconnect(succeeds_eventually, sender=self)
 
-         self._testIsClean(a_signal)
 
-     def test_retry_with_dispatch_uid(self):
 
-         uid = 'abc123'
 
-         a_signal.connect(receiver_1_arg, sender=self, retry=True,
 
-                          dispatch_uid=uid)
 
-         assert a_signal.receivers[0][0][0] == uid
 
-         a_signal.disconnect(receiver_1_arg, sender=self, dispatch_uid=uid)
 
-         self._testIsClean(a_signal)
 
-     def test_boundmethod(self):
 
-         a = Callable()
 
-         a_signal.connect(a.a, sender=self)
 
-         expected = [(a.a, 'test')]
 
-         garbage_collect()
 
-         result = a_signal.send(sender=self, val='test')
 
-         assert result == expected
 
-         del a, result, expected
 
-         garbage_collect()
 
-         self._testIsClean(a_signal)
 
 
  |