test_timer2.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from __future__ import absolute_import
  2. import sys
  3. import time
  4. from kombu.tests.utils import redirect_stdouts
  5. from mock import Mock, patch
  6. import celery.utils.timer2 as timer2
  7. from celery.tests.utils import Case, skip_if_quick
  8. class test_Entry(Case):
  9. def test_call(self):
  10. scratch = [None]
  11. def timed(x, y, moo='foo'):
  12. scratch[0] = (x, y, moo)
  13. tref = timer2.Entry(timed, (4, 4), {'moo': 'baz'})
  14. tref()
  15. self.assertTupleEqual(scratch[0], (4, 4, 'baz'))
  16. def test_cancel(self):
  17. tref = timer2.Entry(lambda x: x, (1, ), {})
  18. tref.cancel()
  19. self.assertTrue(tref.cancelled)
  20. class test_Schedule(Case):
  21. def test_supports_Timer_interface(self):
  22. x = timer2.Schedule()
  23. x.stop()
  24. tref = Mock()
  25. x.cancel(tref)
  26. tref.cancel.assert_called_with()
  27. def test_handle_error(self):
  28. from datetime import datetime
  29. to_timestamp = timer2.to_timestamp
  30. scratch = [None]
  31. def _overflow(x):
  32. raise OverflowError(x)
  33. def on_error(exc_info):
  34. scratch[0] = exc_info
  35. s = timer2.Schedule(on_error=on_error)
  36. timer2.to_timestamp = _overflow
  37. try:
  38. s.enter(timer2.Entry(lambda: None, (), {}),
  39. eta=datetime.now())
  40. s.enter(timer2.Entry(lambda: None, (), {}),
  41. eta=None)
  42. s.on_error = None
  43. with self.assertRaises(OverflowError):
  44. s.enter(timer2.Entry(lambda: None, (), {}),
  45. eta=datetime.now())
  46. finally:
  47. timer2.to_timestamp = to_timestamp
  48. exc = scratch[0]
  49. self.assertIsInstance(exc, OverflowError)
  50. class test_Timer(Case):
  51. @skip_if_quick
  52. def test_enter_after(self):
  53. t = timer2.Timer()
  54. try:
  55. done = [False]
  56. def set_done():
  57. done[0] = True
  58. t.apply_after(300, set_done)
  59. mss = 0
  60. while not done[0]:
  61. if mss >= 2.0:
  62. raise Exception('test timed out')
  63. time.sleep(0.1)
  64. mss += 0.1
  65. finally:
  66. t.stop()
  67. def test_exit_after(self):
  68. t = timer2.Timer()
  69. t.apply_after = Mock()
  70. t.exit_after(300, priority=10)
  71. t.apply_after.assert_called_with(300, sys.exit, 10)
  72. def test_apply_interval(self):
  73. t = timer2.Timer()
  74. try:
  75. t.schedule.enter_after = Mock()
  76. myfun = Mock()
  77. myfun.__name__ = 'myfun'
  78. t.apply_interval(30, myfun)
  79. self.assertEqual(t.schedule.enter_after.call_count, 1)
  80. args1, _ = t.schedule.enter_after.call_args_list[0]
  81. msec1, tref1, _ = args1
  82. self.assertEqual(msec1, 30)
  83. tref1()
  84. self.assertEqual(t.schedule.enter_after.call_count, 2)
  85. args2, _ = t.schedule.enter_after.call_args_list[1]
  86. msec2, tref2, _ = args2
  87. self.assertEqual(msec2, 30)
  88. tref2.cancelled = True
  89. tref2()
  90. self.assertEqual(t.schedule.enter_after.call_count, 2)
  91. finally:
  92. t.stop()
  93. @patch('celery.utils.timer2.logger')
  94. def test_apply_entry_error_handled(self, logger):
  95. t = timer2.Timer()
  96. t.schedule.on_error = None
  97. fun = Mock()
  98. fun.side_effect = ValueError()
  99. t.schedule.apply_entry(fun)
  100. self.assertTrue(logger.error.called)
  101. @redirect_stdouts
  102. def test_apply_entry_error_not_handled(self, stdout, stderr):
  103. t = timer2.Timer()
  104. t.schedule.on_error = Mock()
  105. fun = Mock()
  106. fun.side_effect = ValueError()
  107. t.schedule.apply_entry(fun)
  108. fun.assert_called_with()
  109. self.assertFalse(stderr.getvalue())
  110. @patch('os._exit')
  111. def test_thread_crash(self, _exit):
  112. t = timer2.Timer()
  113. t._next_entry = Mock()
  114. t._next_entry.side_effect = OSError(131)
  115. t.run()
  116. _exit.assert_called_with(1)
  117. def test_gc_race_lost(self):
  118. t = timer2.Timer()
  119. t._is_stopped.set = Mock()
  120. t._is_stopped.set.side_effect = TypeError()
  121. t._is_shutdown.set()
  122. t.run()
  123. t._is_stopped.set.assert_called_with()
  124. def test_to_timestamp(self):
  125. self.assertIs(timer2.to_timestamp(3.13), 3.13)
  126. def test_test_enter(self):
  127. t = timer2.Timer()
  128. t._do_enter = Mock()
  129. e = Mock()
  130. t.enter(e, 13, 0)
  131. t._do_enter.assert_called_with('enter', e, 13, priority=0)
  132. def test_test_enter_after(self):
  133. t = timer2.Timer()
  134. t._do_enter = Mock()
  135. t.enter_after()
  136. t._do_enter.assert_called_with('enter_after')
  137. def test_cancel(self):
  138. t = timer2.Timer()
  139. tref = Mock()
  140. t.cancel(tref)
  141. tref.cancel.assert_called_with()