test_gevent.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import os
  4. import sys
  5. from nose import SkipTest
  6. from mock import Mock
  7. from celery.concurrency.gevent import (
  8. Schedule,
  9. Timer,
  10. TaskPool,
  11. )
  12. from celery.tests.utils import Case, mock_module, patch_many, skip_if_pypy
  13. gevent_modules = (
  14. 'gevent',
  15. 'gevent.monkey',
  16. 'gevent.greenlet',
  17. 'gevent.pool',
  18. 'greenlet',
  19. )
  20. class GeventCase(Case):
  21. @skip_if_pypy
  22. def setUp(self):
  23. try:
  24. self.gevent = __import__('gevent')
  25. except ImportError:
  26. raise SkipTest(
  27. 'gevent not installed, skipping related tests.')
  28. class test_gevent_patch(GeventCase):
  29. def test_is_patched(self):
  30. with mock_module(*gevent_modules):
  31. monkey_patched = []
  32. import gevent
  33. from gevent import monkey
  34. gevent.version_info = (1, 0, 0)
  35. prev_monkey_patch = monkey.patch_all
  36. monkey.patch_all = lambda: monkey_patched.append(True)
  37. prev_gevent = sys.modules.pop('celery.concurrency.gevent', None)
  38. os.environ.pop('GEVENT_NOPATCH')
  39. try:
  40. import celery.concurrency.gevent # noqa
  41. self.assertTrue(monkey_patched)
  42. finally:
  43. sys.modules['celery.concurrency.gevent'] = prev_gevent
  44. os.environ['GEVENT_NOPATCH'] = 'yes'
  45. monkey.patch_all = prev_monkey_patch
  46. class test_Schedule(Case):
  47. def test_sched(self):
  48. with mock_module(*gevent_modules):
  49. with patch_many('gevent.greenlet',
  50. 'gevent.greenlet.GreenletExit') as (greenlet,
  51. GreenletExit):
  52. greenlet.Greenlet = object
  53. x = Schedule()
  54. greenlet.Greenlet = Mock()
  55. x._Greenlet.spawn_later = Mock()
  56. x._GreenletExit = KeyError
  57. entry = Mock()
  58. g = x._enter(1, 0, entry)
  59. self.assertTrue(x.queue)
  60. x._entry_exit(g)
  61. g.kill.assert_called_with()
  62. self.assertFalse(x._queue)
  63. x._queue.add(g)
  64. x.clear()
  65. x._queue.add(g)
  66. g.kill.side_effect = KeyError()
  67. x.clear()
  68. class test_TasKPool(Case):
  69. def test_pool(self):
  70. with mock_module(*gevent_modules):
  71. with patch_many('gevent.spawn_raw', 'gevent.pool.Pool') as (
  72. spawn_raw, Pool):
  73. x = TaskPool()
  74. x.on_start()
  75. x.on_stop()
  76. x.on_apply(Mock())
  77. x._pool = None
  78. x.on_stop()
  79. x._pool = Mock()
  80. x._pool._semaphore.counter = 1
  81. x._pool.size = 1
  82. x.grow()
  83. self.assertEqual(x._pool.size, 2)
  84. self.assertEqual(x._pool._semaphore.counter, 2)
  85. x.shrink()
  86. self.assertEqual(x._pool.size, 1)
  87. self.assertEqual(x._pool._semaphore.counter, 1)
  88. x._pool = [4, 5, 6]
  89. self.assertEqual(x.num_processes, 3)
  90. class test_Timer(Case):
  91. def test_timer(self):
  92. x = Timer()
  93. x.ensure_started()
  94. x.schedule = Mock()
  95. x.start()
  96. x.stop()
  97. x.schedule.clear.assert_called_with()