test_dispatcher.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from __future__ import absolute_import, unicode_literals
  2. import gc
  3. import sys
  4. import time
  5. from celery.utils.dispatch import Signal
  6. if sys.platform.startswith('java'):
  7. def garbage_collect():
  8. # Some JVM GCs will execute finalizers in a different thread, meaning
  9. # we need to wait for that to complete before we go on looking for the
  10. # effects of that.
  11. gc.collect()
  12. time.sleep(0.1)
  13. elif hasattr(sys, 'pypy_version_info'):
  14. def garbage_collect(): # noqa
  15. # Collecting weakreferences can take two collections on PyPy.
  16. gc.collect()
  17. gc.collect()
  18. else:
  19. def garbage_collect(): # noqa
  20. gc.collect()
  21. def receiver_1_arg(val, **kwargs):
  22. return val
  23. class Callable(object):
  24. def __call__(self, val, **kwargs):
  25. return val
  26. def a(self, val, **kwargs):
  27. return val
  28. a_signal = Signal(providing_args=['val'], use_caching=False)
  29. class test_Signal:
  30. """Test suite for dispatcher (barely started)"""
  31. def _testIsClean(self, signal):
  32. """Assert that everything has been cleaned up automatically"""
  33. assert not signal.has_listeners()
  34. assert signal.receivers == []
  35. def test_exact(self):
  36. a_signal.connect(receiver_1_arg, sender=self)
  37. try:
  38. expected = [(receiver_1_arg, 'test')]
  39. result = a_signal.send(sender=self, val='test')
  40. assert result == expected
  41. finally:
  42. a_signal.disconnect(receiver_1_arg, sender=self)
  43. self._testIsClean(a_signal)
  44. def test_ignored_sender(self):
  45. a_signal.connect(receiver_1_arg)
  46. try:
  47. expected = [(receiver_1_arg, 'test')]
  48. result = a_signal.send(sender=self, val='test')
  49. assert result == expected
  50. finally:
  51. a_signal.disconnect(receiver_1_arg)
  52. self._testIsClean(a_signal)
  53. def test_garbage_collected(self):
  54. a = Callable()
  55. a_signal.connect(a.a, sender=self)
  56. expected = []
  57. del a
  58. garbage_collect()
  59. result = a_signal.send(sender=self, val='test')
  60. assert result == expected
  61. self._testIsClean(a_signal)
  62. def test_multiple_registration(self):
  63. a = Callable()
  64. result = None
  65. try:
  66. a_signal.connect(a)
  67. a_signal.connect(a)
  68. a_signal.connect(a)
  69. a_signal.connect(a)
  70. a_signal.connect(a)
  71. a_signal.connect(a)
  72. result = a_signal.send(sender=self, val='test')
  73. assert len(result) == 1
  74. assert len(a_signal.receivers) == 1
  75. finally:
  76. del a
  77. del result
  78. garbage_collect()
  79. self._testIsClean(a_signal)
  80. def test_uid_registration(self):
  81. def uid_based_receiver_1(**kwargs):
  82. pass
  83. def uid_based_receiver_2(**kwargs):
  84. pass
  85. a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
  86. try:
  87. a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
  88. assert len(a_signal.receivers) == 1
  89. finally:
  90. a_signal.disconnect(dispatch_uid='uid')
  91. self._testIsClean(a_signal)
  92. def test_robust(self):
  93. def fails(val, **kwargs):
  94. raise ValueError('this')
  95. a_signal.connect(fails)
  96. try:
  97. a_signal.send(sender=self, val='test')
  98. finally:
  99. a_signal.disconnect(fails)
  100. self._testIsClean(a_signal)
  101. def test_disconnection(self):
  102. receiver_1 = Callable()
  103. receiver_2 = Callable()
  104. receiver_3 = Callable()
  105. try:
  106. try:
  107. a_signal.connect(receiver_1)
  108. a_signal.connect(receiver_2)
  109. a_signal.connect(receiver_3)
  110. finally:
  111. a_signal.disconnect(receiver_1)
  112. del receiver_2
  113. garbage_collect()
  114. finally:
  115. a_signal.disconnect(receiver_3)
  116. self._testIsClean(a_signal)