test_dispatcher.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from __future__ import absolute_import
  2. import gc
  3. import sys
  4. import time
  5. from celery.utils.dispatch import Signal
  6. from celery.tests.utils import Case
  7. if sys.platform.startswith('java'):
  8. def garbage_collect():
  9. # Some JVM GCs will execute finalizers in a different thread, meaning
  10. # we need to wait for that to complete before we go on looking for the
  11. # effects of that.
  12. gc.collect()
  13. time.sleep(0.1)
  14. elif hasattr(sys, 'pypy_version_info'):
  15. def garbage_collect(): # noqa
  16. # Collecting weakreferences can take two collections on PyPy.
  17. gc.collect()
  18. gc.collect()
  19. else:
  20. def garbage_collect(): # noqa
  21. gc.collect()
  22. def receiver_1_arg(val, **kwargs):
  23. return val
  24. class Callable(object):
  25. def __call__(self, val, **kwargs):
  26. return val
  27. def a(self, val, **kwargs):
  28. return val
  29. a_signal = Signal(providing_args=['val'])
  30. class DispatcherTests(Case):
  31. """Test suite for dispatcher (barely started)"""
  32. def _testIsClean(self, signal):
  33. """Assert that everything has been cleaned up automatically"""
  34. self.assertEqual(signal.receivers, [])
  35. # force cleanup just in case
  36. signal.receivers = []
  37. def testExact(self):
  38. a_signal.connect(receiver_1_arg, sender=self)
  39. expected = [(receiver_1_arg, 'test')]
  40. result = a_signal.send(sender=self, val='test')
  41. self.assertEqual(result, expected)
  42. a_signal.disconnect(receiver_1_arg, sender=self)
  43. self._testIsClean(a_signal)
  44. def testIgnoredSender(self):
  45. a_signal.connect(receiver_1_arg)
  46. expected = [(receiver_1_arg, 'test')]
  47. result = a_signal.send(sender=self, val='test')
  48. self.assertEqual(result, expected)
  49. a_signal.disconnect(receiver_1_arg)
  50. self._testIsClean(a_signal)
  51. def testGarbageCollected(self):
  52. a = Callable()
  53. a_signal.connect(a.a, sender=self)
  54. expected = []
  55. del a
  56. garbage_collect()
  57. result = a_signal.send(sender=self, val='test')
  58. self.assertEqual(result, expected)
  59. self._testIsClean(a_signal)
  60. def testMultipleRegistration(self):
  61. a = Callable()
  62. a_signal.connect(a)
  63. a_signal.connect(a)
  64. a_signal.connect(a)
  65. a_signal.connect(a)
  66. a_signal.connect(a)
  67. a_signal.connect(a)
  68. result = a_signal.send(sender=self, val='test')
  69. self.assertEqual(len(result), 1)
  70. self.assertEqual(len(a_signal.receivers), 1)
  71. del a
  72. del result
  73. garbage_collect()
  74. self._testIsClean(a_signal)
  75. def testUidRegistration(self):
  76. def uid_based_receiver_1(**kwargs):
  77. pass
  78. def uid_based_receiver_2(**kwargs):
  79. pass
  80. a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
  81. a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
  82. self.assertEqual(len(a_signal.receivers), 1)
  83. a_signal.disconnect(dispatch_uid='uid')
  84. self._testIsClean(a_signal)
  85. def testRobust(self):
  86. """Test the sendRobust function"""
  87. def fails(val, **kwargs):
  88. raise ValueError('this')
  89. a_signal.connect(fails)
  90. result = a_signal.send_robust(sender=self, val='test')
  91. err = result[0][1]
  92. self.assertTrue(isinstance(err, ValueError))
  93. self.assertEqual(err.args, ('this',))
  94. a_signal.disconnect(fails)
  95. self._testIsClean(a_signal)
  96. def testDisconnection(self):
  97. receiver_1 = Callable()
  98. receiver_2 = Callable()
  99. receiver_3 = Callable()
  100. a_signal.connect(receiver_1)
  101. a_signal.connect(receiver_2)
  102. a_signal.connect(receiver_3)
  103. a_signal.disconnect(receiver_1)
  104. del receiver_2
  105. garbage_collect()
  106. a_signal.disconnect(receiver_3)
  107. self._testIsClean(a_signal)