test_gevent.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from __future__ import absolute_import
  2. from celery.concurrency.gevent import (
  3. Timer,
  4. TaskPool,
  5. apply_timeout,
  6. )
  7. from celery.tests.case import AppCase, Mock, patch, skip_if_pypy
  8. gevent_modules = (
  9. 'gevent',
  10. 'gevent.monkey',
  11. 'gevent.greenlet',
  12. 'gevent.pool',
  13. 'greenlet',
  14. )
  15. class GeventCase(AppCase):
  16. @skip_if_pypy
  17. def setup(self):
  18. self.mock_modules(*gevent_modules)
  19. class test_gevent_patch(GeventCase):
  20. def test_is_patched(self):
  21. with patch('gevent.monkey.patch_all', create=True) as patch_all:
  22. import gevent
  23. gevent.version_info = (1, 0, 0)
  24. from celery import maybe_patch_concurrency
  25. maybe_patch_concurrency(['x', '-P', 'gevent'])
  26. self.assertTrue(patch_all.called)
  27. class test_Timer(GeventCase):
  28. def setup(self):
  29. GeventCase.setup(self)
  30. self.greenlet = self.patch('gevent.greenlet')
  31. self.GreenletExit = self.patch('gevent.greenlet.GreenletExit')
  32. def test_sched(self):
  33. self.greenlet.Greenlet = object
  34. x = Timer()
  35. self.greenlet.Greenlet = Mock()
  36. x._Greenlet.spawn_later = Mock()
  37. x._GreenletExit = KeyError
  38. entry = Mock()
  39. g = x._enter(1, 0, entry)
  40. self.assertTrue(x.queue)
  41. x._entry_exit(g)
  42. g.kill.assert_called_with()
  43. self.assertFalse(x._queue)
  44. x._queue.add(g)
  45. x.clear()
  46. x._queue.add(g)
  47. g.kill.side_effect = KeyError()
  48. x.clear()
  49. g = x._Greenlet()
  50. g.cancel()
  51. class test_TaskPool(GeventCase):
  52. def setup(self):
  53. GeventCase.setup(self)
  54. self.spawn_raw = self.patch('gevent.spawn_raw')
  55. self.Pool = self.patch('gevent.pool.Pool')
  56. def test_pool(self):
  57. x = TaskPool()
  58. x.on_start()
  59. x.on_stop()
  60. x.on_apply(Mock())
  61. x._pool = None
  62. x.on_stop()
  63. x._pool = Mock()
  64. x._pool._semaphore.counter = 1
  65. x._pool.size = 1
  66. x.grow()
  67. self.assertEqual(x._pool.size, 2)
  68. self.assertEqual(x._pool._semaphore.counter, 2)
  69. x.shrink()
  70. self.assertEqual(x._pool.size, 1)
  71. self.assertEqual(x._pool._semaphore.counter, 1)
  72. x._pool = [4, 5, 6]
  73. self.assertEqual(x.num_processes, 3)
  74. class test_apply_timeout(AppCase):
  75. def test_apply_timeout(self):
  76. class Timeout(Exception):
  77. value = None
  78. def __init__(self, value):
  79. self.__class__.value = value
  80. def __enter__(self):
  81. return self
  82. def __exit__(self, *exc_info):
  83. pass
  84. timeout_callback = Mock(name='timeout_callback')
  85. apply_target = Mock(name='apply_target')
  86. apply_timeout(
  87. Mock(), timeout=10, callback=Mock(name='callback'),
  88. timeout_callback=timeout_callback,
  89. apply_target=apply_target, Timeout=Timeout,
  90. )
  91. self.assertEqual(Timeout.value, 10)
  92. self.assertTrue(apply_target.called)
  93. apply_target.side_effect = Timeout(10)
  94. apply_timeout(
  95. Mock(), timeout=10, callback=Mock(),
  96. timeout_callback=timeout_callback,
  97. apply_target=apply_target, Timeout=Timeout,
  98. )
  99. timeout_callback.assert_called_with(False, 10)