test_gevent.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import os
  4. import sys
  5. from nose import SkipTest
  6. from mock import patch, Mock
  7. from celery.concurrency.gevent import (
  8. Schedule,
  9. Timer,
  10. TaskPool,
  11. )
  12. from celery.tests.utils import Case, mock_module
  13. gevent_modules = (
  14. "gevent",
  15. "gevent.monkey",
  16. "gevent.greenlet",
  17. "gevent.pool",
  18. "greenlet",
  19. )
  20. class GeventCase(Case):
  21. def setUp(self):
  22. if getattr(sys, "pypy_version_info", None):
  23. raise SkipTest("Does not work on PyPy")
  24. try:
  25. self.eventlet = __import__("gevent")
  26. except ImportError:
  27. raise SkipTest(
  28. "gevent not installed, skipping related tests.")
  29. class test_gevent_patch(GeventCase):
  30. def test_is_patched(self):
  31. with mock_module(*gevent_modules):
  32. monkey_patched = []
  33. from gevent import monkey
  34. prev_monkey_patch = monkey.patch_all
  35. monkey.patch_all = lambda: monkey_patched.append(True)
  36. prev_gevent = sys.modules.pop("celery.concurrency.gevent", None)
  37. os.environ.pop("GEVENT_NOPATCH")
  38. try:
  39. import celery.concurrency.gevent # noqa
  40. self.assertTrue(monkey_patched)
  41. finally:
  42. sys.modules["celery.concurrency.gevent"] = prev_gevent
  43. os.environ["GEVENT_NOPATCH"] = "yes"
  44. monkey.patch_all = prev_monkey_patch
  45. class test_Schedule(Case):
  46. def test_sched(self):
  47. with mock_module(*gevent_modules):
  48. @patch("gevent.greenlet")
  49. @patch("gevent.greenlet.GreenletExit")
  50. def do_test(GreenletExit, greenlet):
  51. greenlet.Greenlet = object
  52. x = Schedule()
  53. greenlet.Greenlet = Mock()
  54. x._Greenlet.spawn_later = Mock()
  55. x._GreenletExit = KeyError
  56. entry = Mock()
  57. g = x._enter(1, 0, entry)
  58. self.assertTrue(x.queue)
  59. x._entry_exit(g)
  60. g.kill.assert_called_with()
  61. self.assertFalse(x._queue)
  62. x._queue.add(g)
  63. x.clear()
  64. x._queue.add(g)
  65. g.kill.side_effect = KeyError()
  66. x.clear()
  67. do_test()
  68. class test_TasKPool(Case):
  69. def test_pool(self):
  70. with mock_module(*gevent_modules):
  71. @patch("gevent.spawn_raw")
  72. @patch("gevent.pool.Pool")
  73. def do_test(Pool, spawn_raw):
  74. x = TaskPool()
  75. x.on_start()
  76. x.on_stop()
  77. x.on_apply(Mock())
  78. x._pool = None
  79. x.on_stop()
  80. x._pool = Mock()
  81. x._pool._semaphore.counter = 1
  82. x._pool.size = 1
  83. x.grow()
  84. self.assertEqual(x._pool.size, 2)
  85. self.assertEqual(x._pool._semaphore.counter, 2)
  86. x.shrink()
  87. self.assertEqual(x._pool.size, 1)
  88. self.assertEqual(x._pool._semaphore.counter, 1)
  89. x._pool = [4, 5, 6]
  90. self.assertEqual(x.num_processes, 3)
  91. do_test()
  92. class test_Timer(Case):
  93. def test_timer(self):
  94. x = Timer()
  95. x.ensure_started()
  96. x.schedule = Mock()
  97. x.start()
  98. x.stop()
  99. x.schedule.clear.assert_called_with()