test_eventlet.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. from __future__ import absolute_import
  2. import sys
  3. from nose import SkipTest
  4. from mock import patch, Mock
  5. from celery.app.defaults import is_pypy
  6. from celery.concurrency.eventlet import (
  7. apply_target,
  8. Schedule,
  9. Timer,
  10. TaskPool,
  11. )
  12. from celery.tests.case import AppCase, mock_module, patch_many, skip_if_pypy
  13. class EventletCase(AppCase):
  14. @skip_if_pypy
  15. def setup(self):
  16. if is_pypy:
  17. raise SkipTest('mock_modules not working on PyPy1.9')
  18. try:
  19. self.eventlet = __import__('eventlet')
  20. except ImportError:
  21. raise SkipTest(
  22. 'eventlet not installed, skipping related tests.')
  23. @skip_if_pypy
  24. def teardown(self):
  25. for mod in [mod for mod in sys.modules if mod.startswith('eventlet')]:
  26. try:
  27. del(sys.modules[mod])
  28. except KeyError:
  29. pass
  30. class test_aaa_eventlet_patch(EventletCase):
  31. def test_aaa_is_patched(self):
  32. with patch('eventlet.monkey_patch', create=True) as monkey_patch:
  33. from celery import maybe_patch_concurrency
  34. maybe_patch_concurrency(['x', '-P', 'eventlet'])
  35. monkey_patch.assert_called_with()
  36. eventlet_modules = (
  37. 'eventlet',
  38. 'eventlet.debug',
  39. 'eventlet.greenthread',
  40. 'eventlet.greenpool',
  41. 'greenlet',
  42. )
  43. class test_Schedule(EventletCase):
  44. def test_sched(self):
  45. with mock_module(*eventlet_modules):
  46. with patch_many('eventlet.greenthread.spawn_after',
  47. 'greenlet.GreenletExit') as (spawn_after,
  48. GreenletExit):
  49. x = Schedule()
  50. x.GreenletExit = KeyError
  51. entry = Mock()
  52. g = x._enter(1, 0, entry)
  53. self.assertTrue(x.queue)
  54. x._entry_exit(g, entry)
  55. g.wait.side_effect = KeyError()
  56. x._entry_exit(g, entry)
  57. entry.cancel.assert_called_with()
  58. self.assertFalse(x._queue)
  59. x._queue.add(g)
  60. x.clear()
  61. x._queue.add(g)
  62. g.cancel.side_effect = KeyError()
  63. x.clear()
  64. class test_TaskPool(EventletCase):
  65. def test_pool(self):
  66. with mock_module(*eventlet_modules):
  67. with patch_many('eventlet.greenpool.GreenPool',
  68. 'eventlet.greenthread') as (GreenPool,
  69. greenthread):
  70. x = TaskPool()
  71. x.on_start()
  72. x.on_stop()
  73. x.on_apply(Mock())
  74. x._pool = None
  75. x.on_stop()
  76. self.assertTrue(x.getpid())
  77. @patch('celery.concurrency.eventlet.base')
  78. def test_apply_target(self, base):
  79. apply_target(Mock(), getpid=Mock())
  80. self.assertTrue(base.apply_target.called)
  81. class test_Timer(EventletCase):
  82. def test_timer(self):
  83. x = Timer()
  84. x.ensure_started()
  85. x.schedule = Mock()
  86. x.start()
  87. x.stop()
  88. x.schedule.clear.assert_called_with()
  89. tref = Mock()
  90. x.cancel(tref)
  91. x.schedule.GreenletExit = KeyError
  92. tref.cancel.side_effect = KeyError()
  93. x.cancel(tref)