test_gevent.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from __future__ import absolute_import, unicode_literals
  2. from case import Mock
  3. from celery.concurrency.gevent import TaskPool, Timer, apply_timeout
  4. gevent_modules = (
  5. 'gevent',
  6. 'gevent.monkey',
  7. 'gevent.greenlet',
  8. 'gevent.pool',
  9. 'greenlet',
  10. )
  11. class test_gevent_patch:
  12. def test_is_patched(self):
  13. self.patching.modules(*gevent_modules)
  14. patch_all = self.patching('gevent.monkey.patch_all')
  15. import gevent
  16. gevent.version_info = (1, 0, 0)
  17. from celery import maybe_patch_concurrency
  18. maybe_patch_concurrency(['x', '-P', 'gevent'])
  19. patch_all.assert_called()
  20. class test_Timer:
  21. def setup(self):
  22. self.patching.modules(*gevent_modules)
  23. self.greenlet = self.patching('gevent.greenlet')
  24. self.GreenletExit = self.patching('gevent.greenlet.GreenletExit')
  25. def test_sched(self):
  26. self.greenlet.Greenlet = object
  27. x = Timer()
  28. self.greenlet.Greenlet = Mock()
  29. x._Greenlet.spawn_later = Mock()
  30. x._GreenletExit = KeyError
  31. entry = Mock()
  32. g = x._enter(1, 0, entry)
  33. assert x.queue
  34. x._entry_exit(g)
  35. g.kill.assert_called_with()
  36. assert not x._queue
  37. x._queue.add(g)
  38. x.clear()
  39. x._queue.add(g)
  40. g.kill.side_effect = KeyError()
  41. x.clear()
  42. g = x._Greenlet()
  43. g.cancel()
  44. class test_TaskPool:
  45. def setup(self):
  46. self.patching.modules(*gevent_modules)
  47. self.spawn_raw = self.patching('gevent.spawn_raw')
  48. self.Pool = self.patching('gevent.pool.Pool')
  49. def test_pool(self):
  50. x = TaskPool()
  51. x.on_start()
  52. x.on_stop()
  53. x.on_apply(Mock())
  54. x._pool = None
  55. x.on_stop()
  56. x._pool = Mock()
  57. x._pool._semaphore.counter = 1
  58. x._pool.size = 1
  59. x.grow()
  60. assert x._pool.size == 2
  61. assert x._pool._semaphore.counter == 2
  62. x.shrink()
  63. assert x._pool.size, 1
  64. assert x._pool._semaphore.counter == 1
  65. x._pool = [4, 5, 6]
  66. assert x.num_processes == 3
  67. class test_apply_timeout:
  68. def test_apply_timeout(self):
  69. self.patching.modules(*gevent_modules)
  70. class Timeout(Exception):
  71. value = None
  72. def __init__(self, value):
  73. self.__class__.value = value
  74. def __enter__(self):
  75. return self
  76. def __exit__(self, *exc_info):
  77. pass
  78. timeout_callback = Mock(name='timeout_callback')
  79. apply_target = Mock(name='apply_target')
  80. apply_timeout(
  81. Mock(), timeout=10, callback=Mock(name='callback'),
  82. timeout_callback=timeout_callback,
  83. apply_target=apply_target, Timeout=Timeout,
  84. )
  85. assert Timeout.value == 10
  86. apply_target.assert_called()
  87. apply_target.side_effect = Timeout(10)
  88. apply_timeout(
  89. Mock(), timeout=10, callback=Mock(),
  90. timeout_callback=timeout_callback,
  91. apply_target=apply_target, Timeout=Timeout,
  92. )
  93. timeout_callback.assert_called_with(False, 10)