@@ -0,0 +1,122 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+import os
+import sys
+from nose import SkipTest
+from mock import patch, Mock
+from celery.concurrency.gevent import (
+ Schedule,
+ Timer,
+ TaskPool,
+from celery.tests.utils import Case, mock_module
+class GeventCase(Case):
+ def setUp(self):
+ if getattr(sys, "pypy_version_info", None):
+ raise SkipTest("Does not work on PyPy")
+ try:
+ self.eventlet = __import__("gevent")
+ except ImportError:
+ raise SkipTest(
+ "gevent not installed, skipping related tests.")
+class test_gevent_patch(GeventCase):
+ def test_is_patched(self):
+ monkey_patched = []
+ from gevent import monkey
+ prev_monkey_patch = monkey.patch_all
+ monkey.patch_all = lambda: monkey_patched.append(True)
+ prev_gevent = sys.modules.pop("celery.concurrency.gevent", None)
+ os.environ.pop("GEVENT_NOPATCH")
+ try:
+ import celery.concurrency.gevent # noqa
+ self.assertTrue(monkey_patched)
+ finally:
+ sys.modules["celery.concurrency.gevent"] = prev_gevent
+ os.environ["GEVENT_NOPATCH"] = "yes"
+ monkey.patch_all = prev_monkey_patch
+gevent_modules = (
+ "gevent",
+ "gevent.monkey",
+ "gevent.greenlet",
+ "gevent.pool",
+ "greenlet",
+class test_Schedule(Case):
+ def test_sched(self):
+ with mock_module(*gevent_modules):
+ @patch("gevent.greenlet.Greenlet")
+ @patch("gevent.greenlet.GreenletExit")
+ def do_test(Greenlet, GreenletExit):
+ x = Schedule()
+ x._Greenlet.spawn_later = Mock()
+ x._GreenletExit = KeyError
+ entry = Mock()
+ g = x._enter(1, 0, entry)
+ self.assertTrue(x.queue)
+ x._entry_exit(g)
+ g.kill.assert_called_with()
+ self.assertFalse(x._queue)
+ x._queue.add(g)
+ x.clear()
+ x._queue.add(g)
+ g.kill.side_effect = KeyError()
+ x.clear()
+ do_test()
+class test_TasKPool(Case):
+ def test_pool(self):
+ with mock_module(*gevent_modules):
+ @patch("gevent.spawn_raw")
+ @patch("gevent.pool.Pool")
+ def do_test(Pool, spawn_raw):
+ x = TaskPool()
+ x.on_start()
+ x.on_stop()
+ x.on_apply(Mock())
+ x._pool = None
+ x.on_stop()
+ x._pool = Mock()
+ x._pool._semaphore.counter = 1
+ x._pool.size = 1
+ x.grow()
+ self.assertEqual(x._pool.size, 2)
+ self.assertEqual(x._pool._semaphore.counter, 2)
+ x.shrink()
+ self.assertEqual(x._pool.size, 1)
+ self.assertEqual(x._pool._semaphore.counter, 1)
+ x._pool = [4, 5, 6]
+ self.assertEqual(x.num_processes, 3)
+ do_test()
+class test_Timer(Case):
+ def test_timer(self):
+ x = Timer()
+ x.ensure_started()
+ x.schedule = Mock()
+ x.start()
+ x.stop()
+ x.schedule.clear.assert_called_with()