test_eventlet.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. from nose import SkipTest
  5. from mock import patch, Mock
  6. from celery.app.defaults import is_pypy
  7. from celery.concurrency.eventlet import (
  8. apply_target,
  9. Schedule,
  10. Timer,
  11. TaskPool,
  12. )
  13. from celery.tests.utils import Case, mock_module, patch_many, skip_if_pypy
  14. class EventletCase(Case):
  15. @skip_if_pypy
  16. def setUp(self):
  17. if is_pypy:
  18. raise SkipTest('mock_modules not working on PyPy1.9')
  19. try:
  20. self.eventlet = __import__('eventlet')
  21. except ImportError:
  22. raise SkipTest(
  23. 'eventlet not installed, skipping related tests.')
  24. @skip_if_pypy
  25. def tearDown(self):
  26. for mod in [mod for mod in sys.modules if mod.startswith('eventlet')]:
  27. try:
  28. del(sys.modules[mod])
  29. except KeyError:
  30. pass
  31. class test_aaa_eventlet_patch(EventletCase):
  32. def test_aaa_is_patched(self):
  33. raise SkipTest('side effects')
  34. monkey_patched = []
  35. prev_monkey_patch = self.eventlet.monkey_patch
  36. self.eventlet.monkey_patch = lambda: monkey_patched.append(True)
  37. prev_eventlet = sys.modules.pop('celery.concurrency.eventlet', None)
  38. os.environ.pop('EVENTLET_NOPATCH')
  39. try:
  40. import celery.concurrency.eventlet # noqa
  41. self.assertTrue(monkey_patched)
  42. finally:
  43. sys.modules['celery.concurrency.eventlet'] = prev_eventlet
  44. os.environ['EVENTLET_NOPATCH'] = 'yes'
  45. self.eventlet.monkey_patch = prev_monkey_patch
  46. eventlet_modules = (
  47. 'eventlet',
  48. 'eventlet.debug',
  49. 'eventlet.greenthread',
  50. 'eventlet.greenpool',
  51. 'greenlet',
  52. )
  53. class test_Schedule(EventletCase):
  54. def test_sched(self):
  55. with mock_module(*eventlet_modules):
  56. with patch_many('eventlet.greenthread.spawn_after',
  57. 'greenlet.GreenletExit') as (spawn_after,
  58. GreenletExit):
  59. x = Schedule()
  60. x.GreenletExit = KeyError
  61. entry = Mock()
  62. g = x._enter(1, 0, entry)
  63. self.assertTrue(x.queue)
  64. x._entry_exit(g, entry)
  65. g.wait.side_effect = KeyError()
  66. x._entry_exit(g, entry)
  67. entry.cancel.assert_called_with()
  68. self.assertFalse(x._queue)
  69. x._queue.add(g)
  70. x.clear()
  71. x._queue.add(g)
  72. g.cancel.side_effect = KeyError()
  73. x.clear()
  74. class test_TaskPool(EventletCase):
  75. def test_pool(self):
  76. with mock_module(*eventlet_modules):
  77. with patch_many('eventlet.greenpool.GreenPool',
  78. 'eventlet.greenthread') as (GreenPool,
  79. greenthread):
  80. x = TaskPool()
  81. x.on_start()
  82. x.on_stop()
  83. x.on_apply(Mock())
  84. x._pool = None
  85. x.on_stop()
  86. self.assertTrue(x.getpid())
  87. @patch('celery.concurrency.eventlet.base')
  88. def test_apply_target(self, base):
  89. apply_target(Mock(), getpid=Mock())
  90. self.assertTrue(base.apply_target.called)
  91. class test_Timer(EventletCase):
  92. def test_timer(self):
  93. x = Timer()
  94. x.ensure_started()
  95. x.schedule = Mock()
  96. x.start()
  97. x.stop()
  98. x.schedule.clear.assert_called_with()
  99. tref = Mock()
  100. x.cancel(tref)
  101. x.schedule.GreenletExit = KeyError
  102. tref.cancel.side_effect = KeyError()
  103. x.cancel(tref)