test_dispatcher.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import gc
  2. import sys
  3. import time
  4. from celery.utils.dispatch import Signal
  5. if sys.platform.startswith('java'):
  6. def garbage_collect():
  7. # Some JVM GCs will execute finalizers in a different thread, meaning
  8. # we need to wait for that to complete before we go on looking for the
  9. # effects of that.
  10. gc.collect()
  11. time.sleep(0.1)
  12. elif hasattr(sys, 'pypy_version_info'):
  13. def garbage_collect(): # noqa
  14. # Collecting weakreferences can take two collections on PyPy.
  15. gc.collect()
  16. gc.collect()
  17. else:
  18. def garbage_collect(): # noqa
  19. gc.collect()
  20. def receiver_1_arg(val, **kwargs):
  21. return val
  22. class Callable:
  23. def __call__(self, val, **kwargs):
  24. return val
  25. def a(self, val, **kwargs):
  26. return val
  27. a_signal = Signal(providing_args=['val'], use_caching=False)
  28. class test_Signal:
  29. """Test suite for dispatcher (barely started)"""
  30. def _testIsClean(self, signal):
  31. """Assert that everything has been cleaned up automatically"""
  32. assert not signal.has_listeners()
  33. assert signal.receivers == []
  34. def test_exact(self):
  35. a_signal.connect(receiver_1_arg, sender=self)
  36. try:
  37. expected = [(receiver_1_arg, 'test')]
  38. result = a_signal.send(sender=self, val='test')
  39. assert result == expected
  40. finally:
  41. a_signal.disconnect(receiver_1_arg, sender=self)
  42. self._testIsClean(a_signal)
  43. def test_ignored_sender(self):
  44. a_signal.connect(receiver_1_arg)
  45. try:
  46. expected = [(receiver_1_arg, 'test')]
  47. result = a_signal.send(sender=self, val='test')
  48. assert result == expected
  49. finally:
  50. a_signal.disconnect(receiver_1_arg)
  51. self._testIsClean(a_signal)
  52. def test_garbage_collected(self):
  53. a = Callable()
  54. a_signal.connect(a.a, sender=self)
  55. expected = []
  56. del a
  57. garbage_collect()
  58. result = a_signal.send(sender=self, val='test')
  59. assert result == expected
  60. self._testIsClean(a_signal)
  61. def test_multiple_registration(self):
  62. a = Callable()
  63. result = None
  64. try:
  65. a_signal.connect(a)
  66. a_signal.connect(a)
  67. a_signal.connect(a)
  68. a_signal.connect(a)
  69. a_signal.connect(a)
  70. a_signal.connect(a)
  71. result = a_signal.send(sender=self, val='test')
  72. assert len(result) == 1
  73. assert len(a_signal.receivers) == 1
  74. finally:
  75. del a
  76. del result
  77. garbage_collect()
  78. self._testIsClean(a_signal)
  79. def test_uid_registration(self):
  80. def uid_based_receiver_1(**kwargs):
  81. pass
  82. def uid_based_receiver_2(**kwargs):
  83. pass
  84. a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
  85. try:
  86. a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
  87. assert len(a_signal.receivers) == 1
  88. finally:
  89. a_signal.disconnect(dispatch_uid='uid')
  90. self._testIsClean(a_signal)
  91. def test_robust(self):
  92. def fails(val, **kwargs):
  93. raise ValueError('this')
  94. a_signal.connect(fails)
  95. try:
  96. a_signal.send(sender=self, val='test')
  97. finally:
  98. a_signal.disconnect(fails)
  99. self._testIsClean(a_signal)
  100. def test_disconnection(self):
  101. receiver_1 = Callable()
  102. receiver_2 = Callable()
  103. receiver_3 = Callable()
  104. try:
  105. try:
  106. a_signal.connect(receiver_1)
  107. a_signal.connect(receiver_2)
  108. a_signal.connect(receiver_3)
  109. finally:
  110. a_signal.disconnect(receiver_1)
  111. del receiver_2
  112. garbage_collect()
  113. finally:
  114. a_signal.disconnect(receiver_3)
  115. self._testIsClean(a_signal)