test_concurrency_eventlet.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. from nose import SkipTest
  5. from mock import patch, Mock
  6. from celery.concurrency.eventlet import (
  7. apply_target,
  8. Schedule,
  9. Timer,
  10. TaskPool,
  11. )
  12. from celery.tests.utils import Case, mock_module
  13. class EventletCase(Case):
  14. def setUp(self):
  15. if getattr(sys, "pypy_version_info", None):
  16. raise SkipTest("Does not work on PyPy")
  17. try:
  18. self.eventlet = __import__("eventlet")
  19. except ImportError:
  20. raise SkipTest(
  21. "eventlet not installed, skipping related tests.")
  22. class test_eventlet_patch(EventletCase):
  23. def test_is_patched(self):
  24. monkey_patched = []
  25. prev_monkey_patch = self.eventlet.monkey_patch
  26. self.eventlet.monkey_patch = lambda: monkey_patched.append(True)
  27. prev_eventlet = sys.modules.pop("celery.concurrency.eventlet", None)
  28. os.environ.pop("EVENTLET_NOPATCH")
  29. try:
  30. import celery.concurrency.eventlet # noqa
  31. self.assertTrue(monkey_patched)
  32. finally:
  33. sys.modules["celery.concurrency.eventlet"] = prev_eventlet
  34. os.environ["EVENTLET_NOPATCH"] = "yes"
  35. self.eventlet.monkey_patch = prev_monkey_patch
  36. eventlet_modules = (
  37. "eventlet",
  38. "eventlet.debug",
  39. "eventlet.greenthread",
  40. "eventlet.greenpool",
  41. "greenlet",
  42. )
  43. class test_Schedule(Case):
  44. def test_sched(self):
  45. with mock_module(*eventlet_modules):
  46. @patch("eventlet.greenthread.spawn_after")
  47. @patch("greenlet.GreenletExit")
  48. def do_test(GreenletExit, spawn_after):
  49. x = Schedule()
  50. x.GreenletExit = KeyError
  51. entry = Mock()
  52. g = x._enter(1, 0, entry)
  53. self.assertTrue(x.queue)
  54. x._entry_exit(g, entry)
  55. g.wait.side_effect = KeyError()
  56. x._entry_exit(g, entry)
  57. entry.cancel.assert_called_with()
  58. self.assertFalse(x._queue)
  59. x._queue.add(g)
  60. x.clear()
  61. x._queue.add(g)
  62. g.cancel.side_effect = KeyError()
  63. x.clear()
  64. do_test()
  65. class test_TasKPool(Case):
  66. def test_pool(self):
  67. with mock_module(*eventlet_modules):
  68. @patch("eventlet.greenpool.GreenPool")
  69. @patch("eventlet.greenthread")
  70. def do_test(greenthread, GreenPool):
  71. x = TaskPool()
  72. x.on_start()
  73. x.on_stop()
  74. x.on_apply(Mock())
  75. x._pool = None
  76. x.on_stop()
  77. self.assertTrue(x.getpid())
  78. do_test()
  79. @patch("celery.concurrency.eventlet.base")
  80. def test_apply_target(self, base):
  81. apply_target(Mock(), getpid=Mock())
  82. self.assertTrue(base.apply_target.called)
  83. class test_Timer(Case):
  84. def test_timer(self):
  85. x = Timer()
  86. x.ensure_started()
  87. x.schedule = Mock()
  88. x.start()
  89. x.stop()
  90. x.schedule.clear.assert_called_with()
  91. tref = Mock()
  92. x.cancel(tref)
  93. x.schedule.GreenletExit = KeyError
  94. tref.cancel.side_effect = KeyError()
  95. x.cancel(tref)