test_dispatcher.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from __future__ import absolute_import, unicode_literals
  2. import gc
  3. import sys
  4. import time
  5. from celery.utils.dispatch import Signal
  6. if sys.platform.startswith('java'):
  7. def garbage_collect():
  8. # Some JVM GCs will execute finalizers in a different thread, meaning
  9. # we need to wait for that to complete before we go on looking for the
  10. # effects of that.
  11. gc.collect()
  12. time.sleep(0.1)
  13. elif hasattr(sys, 'pypy_version_info'):
  14. def garbage_collect(): # noqa
  15. # Collecting weakreferences can take two collections on PyPy.
  16. gc.collect()
  17. gc.collect()
  18. else:
  19. def garbage_collect(): # noqa
  20. gc.collect()
  21. def receiver_1_arg(val, **kwargs):
  22. return val
  23. class Callable(object):
  24. def __call__(self, val, **kwargs):
  25. return val
  26. def a(self, val, **kwargs):
  27. return val
  28. a_signal = Signal(providing_args=['val'], use_caching=False)
  29. class test_Signal:
  30. """Test suite for dispatcher (barely started)"""
  31. def _testIsClean(self, signal):
  32. """Assert that everything has been cleaned up automatically"""
  33. assert not signal.has_listeners()
  34. assert signal.receivers == []
  35. def test_exact(self):
  36. a_signal.connect(receiver_1_arg, sender=self)
  37. try:
  38. expected = [(receiver_1_arg, 'test')]
  39. result = a_signal.send(sender=self, val='test')
  40. assert result == expected
  41. finally:
  42. a_signal.disconnect(receiver_1_arg, sender=self)
  43. self._testIsClean(a_signal)
  44. def test_ignored_sender(self):
  45. a_signal.connect(receiver_1_arg)
  46. try:
  47. expected = [(receiver_1_arg, 'test')]
  48. result = a_signal.send(sender=self, val='test')
  49. assert result == expected
  50. finally:
  51. a_signal.disconnect(receiver_1_arg)
  52. self._testIsClean(a_signal)
  53. def test_garbage_collected(self):
  54. a = Callable()
  55. a_signal.connect(a.a, sender=self)
  56. expected = []
  57. del a
  58. garbage_collect()
  59. result = a_signal.send(sender=self, val='test')
  60. assert result == expected
  61. self._testIsClean(a_signal)
  62. def test_multiple_registration(self):
  63. a = Callable()
  64. result = None
  65. try:
  66. a_signal.connect(a)
  67. a_signal.connect(a)
  68. a_signal.connect(a)
  69. a_signal.connect(a)
  70. a_signal.connect(a)
  71. a_signal.connect(a)
  72. result = a_signal.send(sender=self, val='test')
  73. assert len(result) == 1
  74. assert len(a_signal.receivers) == 1
  75. finally:
  76. del a
  77. del result
  78. garbage_collect()
  79. self._testIsClean(a_signal)
  80. def test_uid_registration(self):
  81. def uid_based_receiver_1(**kwargs):
  82. pass
  83. def uid_based_receiver_2(**kwargs):
  84. pass
  85. a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
  86. try:
  87. a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
  88. assert len(a_signal.receivers) == 1
  89. finally:
  90. a_signal.disconnect(dispatch_uid='uid')
  91. self._testIsClean(a_signal)
  92. def test_robust(self):
  93. def fails(val, **kwargs):
  94. raise ValueError('this')
  95. a_signal.connect(fails)
  96. try:
  97. a_signal.send(sender=self, val='test')
  98. finally:
  99. a_signal.disconnect(fails)
  100. self._testIsClean(a_signal)
  101. def test_disconnection(self):
  102. receiver_1 = Callable()
  103. receiver_2 = Callable()
  104. receiver_3 = Callable()
  105. try:
  106. try:
  107. a_signal.connect(receiver_1)
  108. a_signal.connect(receiver_2)
  109. a_signal.connect(receiver_3)
  110. finally:
  111. a_signal.disconnect(receiver_1)
  112. del receiver_2
  113. garbage_collect()
  114. finally:
  115. a_signal.disconnect(receiver_3)
  116. self._testIsClean(a_signal)
  117. def test_retry(self):
  118. class non_local:
  119. counter = 1
  120. def succeeds_eventually(val, **kwargs):
  121. non_local.counter += 1
  122. if non_local.counter < 3:
  123. raise ValueError('this')
  124. return val
  125. a_signal.connect(succeeds_eventually, sender=self, retry=True)
  126. try:
  127. result = a_signal.send(sender=self, val='test')
  128. assert non_local.counter == 3
  129. assert result[0][1] == 'test'
  130. finally:
  131. a_signal.disconnect(succeeds_eventually, sender=self)
  132. self._testIsClean(a_signal)
  133. def test_retry_with_dispatch_uid(self):
  134. uid = 'abc123'
  135. a_signal.connect(receiver_1_arg, sender=self, retry=True,
  136. dispatch_uid=uid)
  137. assert a_signal.receivers[0][0][0] == uid
  138. a_signal.disconnect(receiver_1_arg, sender=self, dispatch_uid=uid)
  139. self._testIsClean(a_signal)
  140. def test_boundmethod(self):
  141. a = Callable()
  142. a_signal.connect(a.a, sender=self)
  143. expected = [(a.a, 'test')]
  144. garbage_collect()
  145. result = a_signal.send(sender=self, val='test')
  146. assert result == expected
  147. del a, result, expected
  148. garbage_collect()
  149. self._testIsClean(a_signal)