test_dispatcher.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from __future__ import absolute_import, unicode_literals
  2. import gc
  3. import sys
  4. import time
  5. from celery.utils.dispatch import Signal
  6. if sys.platform.startswith('java'):
  7. def garbage_collect():
  8. # Some JVM GCs will execute finalizers in a different thread, meaning
  9. # we need to wait for that to complete before we go on looking for the
  10. # effects of that.
  11. gc.collect()
  12. time.sleep(0.1)
  13. elif hasattr(sys, 'pypy_version_info'):
  14. def garbage_collect(): # noqa
  15. # Collecting weakreferences can take two collections on PyPy.
  16. gc.collect()
  17. gc.collect()
  18. else:
  19. def garbage_collect(): # noqa
  20. gc.collect()
  21. def receiver_1_arg(val, **kwargs):
  22. return val
  23. class Callable(object):
  24. def __call__(self, val, **kwargs):
  25. return val
  26. def a(self, val, **kwargs):
  27. return val
  28. a_signal = Signal(providing_args=['val'])
  29. class test_Signal:
  30. """Test suite for dispatcher (barely started)"""
  31. def _testIsClean(self, signal):
  32. """Assert that everything has been cleaned up automatically"""
  33. assert signal.receivers == []
  34. # force cleanup just in case
  35. signal.receivers = []
  36. def test_exact(self):
  37. a_signal.connect(receiver_1_arg, sender=self)
  38. try:
  39. expected = [(receiver_1_arg, 'test')]
  40. result = a_signal.send(sender=self, val='test')
  41. assert result == expected
  42. finally:
  43. a_signal.disconnect(receiver_1_arg, sender=self)
  44. self._testIsClean(a_signal)
  45. def test_ignored_sender(self):
  46. a_signal.connect(receiver_1_arg)
  47. try:
  48. expected = [(receiver_1_arg, 'test')]
  49. result = a_signal.send(sender=self, val='test')
  50. assert result == expected
  51. finally:
  52. a_signal.disconnect(receiver_1_arg)
  53. self._testIsClean(a_signal)
  54. def test_garbage_collected(self):
  55. a = Callable()
  56. a_signal.connect(a.a, sender=self)
  57. expected = []
  58. del a
  59. garbage_collect()
  60. result = a_signal.send(sender=self, val='test')
  61. assert result == expected
  62. self._testIsClean(a_signal)
  63. def test_multiple_registration(self):
  64. a = Callable()
  65. result = None
  66. try:
  67. a_signal.connect(a)
  68. a_signal.connect(a)
  69. a_signal.connect(a)
  70. a_signal.connect(a)
  71. a_signal.connect(a)
  72. a_signal.connect(a)
  73. result = a_signal.send(sender=self, val='test')
  74. assert len(result) == 1
  75. assert len(a_signal.receivers) == 1
  76. finally:
  77. del a
  78. del result
  79. garbage_collect()
  80. self._testIsClean(a_signal)
  81. def test_uid_registration(self):
  82. def uid_based_receiver_1(**kwargs):
  83. pass
  84. def uid_based_receiver_2(**kwargs):
  85. pass
  86. a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
  87. try:
  88. a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
  89. assert len(a_signal.receivers) == 1
  90. finally:
  91. a_signal.disconnect(dispatch_uid='uid')
  92. self._testIsClean(a_signal)
  93. def test_robust(self):
  94. def fails(val, **kwargs):
  95. raise ValueError('this')
  96. a_signal.connect(fails)
  97. try:
  98. a_signal.send(sender=self, val='test')
  99. finally:
  100. a_signal.disconnect(fails)
  101. self._testIsClean(a_signal)
  102. def test_disconnection(self):
  103. receiver_1 = Callable()
  104. receiver_2 = Callable()
  105. receiver_3 = Callable()
  106. try:
  107. try:
  108. a_signal.connect(receiver_1)
  109. a_signal.connect(receiver_2)
  110. a_signal.connect(receiver_3)
  111. finally:
  112. a_signal.disconnect(receiver_1)
  113. del receiver_2
  114. garbage_collect()
  115. finally:
  116. a_signal.disconnect(receiver_3)
  117. self._testIsClean(a_signal)