test_gevent.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from __future__ import absolute_import
  2. from nose import SkipTest
  3. from mock import Mock
  4. from celery.concurrency.gevent import (
  5. Schedule,
  6. Timer,
  7. TaskPool,
  8. apply_timeout,
  9. )
  10. from celery.tests.case import (
  11. AppCase, mock_module, patch, patch_many, skip_if_pypy,
  12. )
  13. gevent_modules = (
  14. 'gevent',
  15. 'gevent.monkey',
  16. 'gevent.greenlet',
  17. 'gevent.pool',
  18. 'greenlet',
  19. )
  20. class GeventCase(AppCase):
  21. @skip_if_pypy
  22. def setup(self):
  23. try:
  24. self.gevent = __import__('gevent')
  25. except ImportError:
  26. raise SkipTest(
  27. 'gevent not installed, skipping related tests.')
  28. class test_gevent_patch(GeventCase):
  29. def test_is_patched(self):
  30. with mock_module(*gevent_modules):
  31. with patch('gevent.monkey.patch_all', create=True) as patch_all:
  32. import gevent
  33. gevent.version_info = (1, 0, 0)
  34. from celery import maybe_patch_concurrency
  35. maybe_patch_concurrency(['x', '-P', 'gevent'])
  36. self.assertTrue(patch_all.called)
  37. class test_Schedule(AppCase):
  38. def test_sched(self):
  39. with mock_module(*gevent_modules):
  40. with patch_many('gevent.greenlet',
  41. 'gevent.greenlet.GreenletExit') as (greenlet,
  42. GreenletExit):
  43. greenlet.Greenlet = object
  44. x = Schedule()
  45. greenlet.Greenlet = Mock()
  46. x._Greenlet.spawn_later = Mock()
  47. x._GreenletExit = KeyError
  48. entry = Mock()
  49. g = x._enter(1, 0, entry)
  50. self.assertTrue(x.queue)
  51. x._entry_exit(g)
  52. g.kill.assert_called_with()
  53. self.assertFalse(x._queue)
  54. x._queue.add(g)
  55. x.clear()
  56. x._queue.add(g)
  57. g.kill.side_effect = KeyError()
  58. x.clear()
  59. g = x._Greenlet()
  60. g.cancel()
  61. class test_TaskPool(AppCase):
  62. def test_pool(self):
  63. with mock_module(*gevent_modules):
  64. with patch_many('gevent.spawn_raw', 'gevent.pool.Pool') as (
  65. spawn_raw, Pool):
  66. x = TaskPool()
  67. x.on_start()
  68. x.on_stop()
  69. x.on_apply(Mock())
  70. x._pool = None
  71. x.on_stop()
  72. x._pool = Mock()
  73. x._pool._semaphore.counter = 1
  74. x._pool.size = 1
  75. x.grow()
  76. self.assertEqual(x._pool.size, 2)
  77. self.assertEqual(x._pool._semaphore.counter, 2)
  78. x.shrink()
  79. self.assertEqual(x._pool.size, 1)
  80. self.assertEqual(x._pool._semaphore.counter, 1)
  81. x._pool = [4, 5, 6]
  82. self.assertEqual(x.num_processes, 3)
  83. class test_Timer(AppCase):
  84. def test_timer(self):
  85. with mock_module(*gevent_modules):
  86. x = Timer()
  87. x.ensure_started()
  88. x.schedule = Mock()
  89. x.start()
  90. x.stop()
  91. x.schedule.clear.assert_called_with()
  92. class test_apply_timeout(AppCase):
  93. def test_apply_timeout(self):
  94. class Timeout(Exception):
  95. value = None
  96. def __init__(self, value):
  97. self.__class__.value = value
  98. def __enter__(self):
  99. return self
  100. def __exit__(self, *exc_info):
  101. pass
  102. timeout_callback = Mock(name='timeout_callback')
  103. apply_target = Mock(name='apply_target')
  104. apply_timeout(
  105. Mock(), timeout=10, callback=Mock(name='callback'),
  106. timeout_callback=timeout_callback,
  107. apply_target=apply_target, Timeout=Timeout,
  108. )
  109. self.assertEqual(Timeout.value, 10)
  110. self.assertTrue(apply_target.called)
  111. apply_target.side_effect = Timeout(10)
  112. apply_timeout(
  113. Mock(), timeout=10, callback=Mock(),
  114. timeout_callback=timeout_callback,
  115. apply_target=apply_target, Timeout=Timeout,
  116. )
  117. timeout_callback.assert_called_with(False, 10)