test_gevent.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from case import Mock, skip
  2. from celery.concurrency.gevent import (
  3. Timer,
  4. TaskPool,
  5. apply_timeout,
  6. )
  7. gevent_modules = (
  8. 'gevent',
  9. 'gevent.monkey',
  10. 'gevent.greenlet',
  11. 'gevent.pool',
  12. 'greenlet',
  13. )
  14. @skip.if_pypy()
  15. class test_gevent_patch:
  16. def test_is_patched(self):
  17. self.patching.modules(*gevent_modules)
  18. patch_all = self.patching('gevent.monkey.patch_all')
  19. import gevent
  20. gevent.version_info = (1, 0, 0)
  21. from celery import maybe_patch_concurrency
  22. maybe_patch_concurrency(['x', '-P', 'gevent'])
  23. patch_all.assert_called()
  24. @skip.if_pypy()
  25. class test_Timer:
  26. def setup(self):
  27. self.patching.modules(*gevent_modules)
  28. self.greenlet = self.patching('gevent.greenlet')
  29. self.GreenletExit = self.patching('gevent.greenlet.GreenletExit')
  30. def test_sched(self):
  31. self.greenlet.Greenlet = object
  32. x = Timer()
  33. self.greenlet.Greenlet = Mock()
  34. x._Greenlet.spawn_later = Mock()
  35. x._GreenletExit = KeyError
  36. entry = Mock()
  37. g = x._enter(1, 0, entry)
  38. assert x.queue
  39. x._entry_exit(g)
  40. g.kill.assert_called_with()
  41. assert not x._queue
  42. x._queue.add(g)
  43. x.clear()
  44. x._queue.add(g)
  45. g.kill.side_effect = KeyError()
  46. x.clear()
  47. g = x._Greenlet()
  48. g.cancel()
  49. @skip.if_pypy()
  50. class test_TaskPool:
  51. def setup(self):
  52. self.patching.modules(*gevent_modules)
  53. self.spawn_raw = self.patching('gevent.spawn_raw')
  54. self.Pool = self.patching('gevent.pool.Pool')
  55. def test_pool(self):
  56. x = TaskPool()
  57. x.on_start()
  58. x.on_stop()
  59. x.on_apply(Mock())
  60. x._pool = None
  61. x.on_stop()
  62. x._pool = Mock()
  63. x._pool._semaphore.counter = 1
  64. x._pool.size = 1
  65. x.grow()
  66. assert x._pool.size == 2
  67. assert x._pool._semaphore.counter == 2
  68. x.shrink()
  69. assert x._pool.size, 1
  70. assert x._pool._semaphore.counter == 1
  71. x._pool = [4, 5, 6]
  72. assert x.num_processes == 3
  73. @skip.if_pypy()
  74. class test_apply_timeout:
  75. def test_apply_timeout(self):
  76. self.patching.modules(*gevent_modules)
  77. class Timeout(Exception):
  78. value = None
  79. def __init__(self, value):
  80. self.__class__.value = value
  81. def __enter__(self):
  82. return self
  83. def __exit__(self, *exc_info):
  84. pass
  85. timeout_callback = Mock(name='timeout_callback')
  86. apply_target = Mock(name='apply_target')
  87. apply_timeout(
  88. Mock(), timeout=10, callback=Mock(name='callback'),
  89. timeout_callback=timeout_callback,
  90. apply_target=apply_target, Timeout=Timeout,
  91. )
  92. assert Timeout.value == 10
  93. apply_target.assert_called()
  94. apply_target.side_effect = Timeout(10)
  95. apply_timeout(
  96. Mock(), timeout=10, callback=Mock(),
  97. timeout_callback=timeout_callback,
  98. apply_target=apply_target, Timeout=Timeout,
  99. )
  100. timeout_callback.assert_called_with(False, 10)