test_eventlet.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import sys
  4. from case import Mock, patch, skip
  5. from celery.concurrency.eventlet import (
  6. apply_target,
  7. Timer,
  8. TaskPool,
  9. )
  10. eventlet_modules = (
  11. 'eventlet',
  12. 'eventlet.debug',
  13. 'eventlet.greenthread',
  14. 'eventlet.greenpool',
  15. 'greenlet',
  16. )
  17. @skip.if_pypy()
  18. class EventletCase:
  19. def setup(self):
  20. self.patching.modules(*eventlet_modules)
  21. def teardown(self):
  22. for mod in [mod for mod in sys.modules
  23. if mod.startswith('eventlet')]:
  24. try:
  25. del(sys.modules[mod])
  26. except KeyError:
  27. pass
  28. class test_aaa_eventlet_patch(EventletCase):
  29. def test_aaa_is_patched(self):
  30. with patch('eventlet.monkey_patch', create=True) as monkey_patch:
  31. from celery import maybe_patch_concurrency
  32. maybe_patch_concurrency(['x', '-P', 'eventlet'])
  33. monkey_patch.assert_called_with()
  34. @patch('eventlet.debug.hub_blocking_detection', create=True)
  35. @patch('eventlet.monkey_patch', create=True)
  36. def test_aaa_blockdetecet(
  37. self, monkey_patch, hub_blocking_detection, patching):
  38. patching.setenv('EVENTLET_NOBLOCK', '10.3')
  39. from celery import maybe_patch_concurrency
  40. maybe_patch_concurrency(['x', '-P', 'eventlet'])
  41. monkey_patch.assert_called_with()
  42. hub_blocking_detection.assert_called_with(10.3, 10.3)
  43. class test_Timer(EventletCase):
  44. @pytest.fixture(autouse=True)
  45. def setup_patches(self, patching):
  46. self.spawn_after = patching('eventlet.greenthread.spawn_after')
  47. self.GreenletExit = patching('greenlet.GreenletExit')
  48. def test_sched(self):
  49. x = Timer()
  50. x.GreenletExit = KeyError
  51. entry = Mock()
  52. g = x._enter(1, 0, entry)
  53. assert x.queue
  54. x._entry_exit(g, entry)
  55. g.wait.side_effect = KeyError()
  56. x._entry_exit(g, entry)
  57. entry.cancel.assert_called_with()
  58. assert not x._queue
  59. x._queue.add(g)
  60. x.clear()
  61. x._queue.add(g)
  62. g.cancel.side_effect = KeyError()
  63. x.clear()
  64. def test_cancel(self):
  65. x = Timer()
  66. tref = Mock(name='tref')
  67. x.cancel(tref)
  68. tref.cancel.assert_called_with()
  69. x.GreenletExit = KeyError
  70. tref.cancel.side_effect = KeyError()
  71. x.cancel(tref)
  72. class test_TaskPool(EventletCase):
  73. @pytest.fixture(autouse=True)
  74. def setup_patches(self, patching):
  75. self.GreenPool = patching('eventlet.greenpool.GreenPool')
  76. self.greenthread = patching('eventlet.greenthread')
  77. def test_pool(self):
  78. x = TaskPool()
  79. x.on_start()
  80. x.on_stop()
  81. x.on_apply(Mock())
  82. x._pool = None
  83. x.on_stop()
  84. assert x.getpid()
  85. @patch('celery.concurrency.eventlet.base')
  86. def test_apply_target(self, base):
  87. apply_target(Mock(), getpid=Mock())
  88. base.apply_target.assert_called()
  89. def test_grow(self):
  90. x = TaskPool(10)
  91. x._pool = Mock(name='_pool')
  92. x.grow(2)
  93. assert x.limit == 12
  94. x._pool.resize.assert_called_with(12)
  95. def test_shrink(self):
  96. x = TaskPool(10)
  97. x._pool = Mock(name='_pool')
  98. x.shrink(2)
  99. assert x.limit == 8
  100. x._pool.resize.assert_called_with(8)
  101. def test_get_info(self):
  102. x = TaskPool(10)
  103. x._pool = Mock(name='_pool')
  104. assert x._get_info() == {
  105. 'max-concurrency': 10,
  106. 'free-threads': x._pool.free(),
  107. 'running-threads': x._pool.running(),
  108. }