test_timer2.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from __future__ import with_statement
  2. import sys
  3. import time
  4. import warnings
  5. from kombu.tests.utils import redirect_stdouts
  6. from mock import Mock, patch
  7. import celery.utils.timer2 as timer2
  8. from celery.tests.utils import unittest, skip_if_quick
  9. from celery.tests.compat import catch_warnings
  10. class test_Entry(unittest.TestCase):
  11. def test_call(self):
  12. scratch = [None]
  13. def timed(x, y, moo="foo"):
  14. scratch[0] = (x, y, moo)
  15. tref = timer2.Entry(timed, (4, 4), {"moo": "baz"})
  16. tref()
  17. self.assertTupleEqual(scratch[0], (4, 4, "baz"))
  18. def test_cancel(self):
  19. tref = timer2.Entry(lambda x: x, (1, ), {})
  20. tref.cancel()
  21. self.assertTrue(tref.cancelled)
  22. class test_Schedule(unittest.TestCase):
  23. def test_handle_error(self):
  24. from datetime import datetime
  25. mktime = timer2.mktime
  26. scratch = [None]
  27. def _overflow(x):
  28. raise OverflowError(x)
  29. def on_error(exc_info):
  30. scratch[0] = exc_info
  31. s = timer2.Schedule(on_error=on_error)
  32. timer2.mktime = _overflow
  33. try:
  34. s.enter(timer2.Entry(lambda: None, (), {}),
  35. eta=datetime.now())
  36. s.enter(timer2.Entry(lambda: None, (), {}),
  37. eta=None)
  38. s.on_error = None
  39. with self.assertRaises(OverflowError):
  40. s.enter(timer2.Entry(lambda: None, (), {}),
  41. eta=datetime.now())
  42. finally:
  43. timer2.mktime = mktime
  44. _, exc, _ = scratch[0]
  45. self.assertIsInstance(exc, OverflowError)
  46. class test_Timer(unittest.TestCase):
  47. @skip_if_quick
  48. def test_enter_after(self):
  49. t = timer2.Timer()
  50. done = [False]
  51. def set_done():
  52. done[0] = True
  53. try:
  54. t.apply_after(300, set_done)
  55. while not done[0]:
  56. time.sleep(0.1)
  57. finally:
  58. t.stop()
  59. def test_exit_after(self):
  60. t = timer2.Timer()
  61. t.apply_after = Mock()
  62. t.exit_after(300, priority=10)
  63. t.apply_after.assert_called_with(300, sys.exit, 10)
  64. def test_apply_interval(self):
  65. t = timer2.Timer()
  66. t.enter_after = Mock()
  67. myfun = Mock()
  68. t.apply_interval(30, myfun)
  69. self.assertEqual(t.enter_after.call_count, 1)
  70. args1, _ = t.enter_after.call_args_list[0]
  71. msec1, tref1, _ = args1
  72. self.assertEqual(msec1, 30)
  73. tref1()
  74. self.assertEqual(t.enter_after.call_count, 2)
  75. args2, _ = t.enter_after.call_args_list[1]
  76. msec2, tref2, _ = args2
  77. self.assertEqual(msec2, 30)
  78. tref2.cancelled = True
  79. tref2()
  80. self.assertEqual(t.enter_after.call_count, 2)
  81. @redirect_stdouts
  82. def test_apply_entry_error_handled(self, stdout, stderr):
  83. t = timer2.Timer()
  84. t.schedule.on_error = None
  85. fun = Mock()
  86. fun.side_effect = ValueError()
  87. warnings.resetwarnings()
  88. with catch_warnings(record=True) as log:
  89. t.apply_entry(fun)
  90. fun.assert_called_with()
  91. self.assertTrue(log)
  92. self.assertTrue(stderr.getvalue())
  93. @redirect_stdouts
  94. def test_apply_entry_error_not_handled(self, stdout, stderr):
  95. t = timer2.Timer()
  96. t.schedule.on_error = Mock()
  97. fun = Mock()
  98. fun.side_effect = ValueError()
  99. warnings.resetwarnings()
  100. with catch_warnings(record=True) as log:
  101. t.apply_entry(fun)
  102. fun.assert_called_with()
  103. self.assertFalse(log)
  104. self.assertFalse(stderr.getvalue())
  105. @patch("os._exit")
  106. def test_thread_crash(self, _exit):
  107. t = timer2.Timer()
  108. t.next = Mock()
  109. t.next.side_effect = OSError(131)
  110. t.run()
  111. _exit.assert_called_with(1)
  112. def test_gc_race_lost(self):
  113. t = timer2.Timer()
  114. t._stopped.set = Mock()
  115. t._stopped.set.side_effect = TypeError()
  116. t._shutdown.set()
  117. t.run()
  118. t._stopped.set.assert_called_with()