test_eventlet.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. from celery.concurrency.eventlet import (
  5. apply_target,
  6. Timer,
  7. TaskPool,
  8. )
  9. from celery.tests.case import AppCase, Mock, patch, skip_if_pypy
  10. class EventletCase(AppCase):
  11. @skip_if_pypy
  12. def setup(self):
  13. self.mock_modules(*eventlet_modules)
  14. @skip_if_pypy
  15. def teardown(self):
  16. for mod in [mod for mod in sys.modules if mod.startswith('eventlet')]:
  17. try:
  18. del(sys.modules[mod])
  19. except KeyError:
  20. pass
  21. class test_aaa_eventlet_patch(EventletCase):
  22. def test_aaa_is_patched(self):
  23. with patch('eventlet.monkey_patch', create=True) as monkey_patch:
  24. from celery import maybe_patch_concurrency
  25. maybe_patch_concurrency(['x', '-P', 'eventlet'])
  26. monkey_patch.assert_called_with()
  27. @patch('eventlet.debug.hub_blocking_detection', create=True)
  28. @patch('eventlet.monkey_patch', create=True)
  29. def test_aaa_blockdetecet(self, monkey_patch, hub_blocking_detection):
  30. os.environ['EVENTLET_NOBLOCK'] = "10.3"
  31. try:
  32. from celery import maybe_patch_concurrency
  33. maybe_patch_concurrency(['x', '-P', 'eventlet'])
  34. monkey_patch.assert_called_with()
  35. hub_blocking_detection.assert_called_with(10.3, 10.3)
  36. finally:
  37. os.environ.pop('EVENTLET_NOBLOCK', None)
  38. eventlet_modules = (
  39. 'eventlet',
  40. 'eventlet.debug',
  41. 'eventlet.greenthread',
  42. 'eventlet.greenpool',
  43. 'greenlet',
  44. )
  45. class test_Timer(EventletCase):
  46. def setup(self):
  47. EventletCase.setup(self)
  48. self.spawn_after = self.patch('eventlet.greenthread.spawn_after')
  49. self.GreenletExit = self.patch('greenlet.GreenletExit')
  50. def test_sched(self):
  51. x = Timer()
  52. x.GreenletExit = KeyError
  53. entry = Mock()
  54. g = x._enter(1, 0, entry)
  55. self.assertTrue(x.queue)
  56. x._entry_exit(g, entry)
  57. g.wait.side_effect = KeyError()
  58. x._entry_exit(g, entry)
  59. entry.cancel.assert_called_with()
  60. self.assertFalse(x._queue)
  61. x._queue.add(g)
  62. x.clear()
  63. x._queue.add(g)
  64. g.cancel.side_effect = KeyError()
  65. x.clear()
  66. def test_cancel(self):
  67. x = Timer()
  68. tref = Mock(name='tref')
  69. x.cancel(tref)
  70. tref.cancel.assert_called_with()
  71. x.GreenletExit = KeyError
  72. tref.cancel.side_effect = KeyError()
  73. x.cancel(tref)
  74. class test_TaskPool(EventletCase):
  75. def setup(self):
  76. EventletCase.setup(self)
  77. self.GreenPool = self.patch('eventlet.greenpool.GreenPool')
  78. self.greenthread = self.patch('eventlet.greenthread')
  79. def test_pool(self):
  80. x = TaskPool()
  81. x.on_start()
  82. x.on_stop()
  83. x.on_apply(Mock())
  84. x._pool = None
  85. x.on_stop()
  86. self.assertTrue(x.getpid())
  87. @patch('celery.concurrency.eventlet.base')
  88. def test_apply_target(self, base):
  89. apply_target(Mock(), getpid=Mock())
  90. self.assertTrue(base.apply_target.called)
  91. def test_grow(self):
  92. x = TaskPool(10)
  93. x._pool = Mock(name='_pool')
  94. x.grow(2)
  95. self.assertEqual(x.limit, 12)
  96. x._pool.resize.assert_called_with(12)
  97. def test_shrink(self):
  98. x = TaskPool(10)
  99. x._pool = Mock(name='_pool')
  100. x.shrink(2)
  101. self.assertEqual(x.limit, 8)
  102. x._pool.resize.assert_called_with(8)
  103. def test_get_info(self):
  104. x = TaskPool(10)
  105. x._pool = Mock(name='_pool')
  106. self.assertDictEqual(x._get_info(), {
  107. 'max-concurrency': 10,
  108. 'free-threads': x._pool.free(),
  109. 'running-threads': x._pool.running(),
  110. })