|
@@ -6,6 +6,7 @@ import sys
|
|
|
from nose import SkipTest
|
|
|
from mock import patch, Mock
|
|
|
|
|
|
+from celery.app.defaults import is_pypy
|
|
|
from celery.concurrency.eventlet import (
|
|
|
apply_target,
|
|
|
Schedule,
|
|
@@ -20,6 +21,8 @@ class EventletCase(Case):
|
|
|
|
|
|
@skip_if_pypy
|
|
|
def setUp(self):
|
|
|
+ if is_pypy:
|
|
|
+ raise SkipTest('mock_modules not working on PyPy1.9')
|
|
|
try:
|
|
|
self.eventlet = __import__('eventlet')
|
|
|
except ImportError:
|
|
@@ -53,7 +56,7 @@ eventlet_modules = (
|
|
|
)
|
|
|
|
|
|
|
|
|
-class test_Schedule(Case):
|
|
|
+class test_Schedule(EventletCase):
|
|
|
|
|
|
def test_sched(self):
|
|
|
with mock_module(*eventlet_modules):
|
|
@@ -79,7 +82,7 @@ class test_Schedule(Case):
|
|
|
x.clear()
|
|
|
|
|
|
|
|
|
-class test_TasKPool(Case):
|
|
|
+class test_TaskPool(EventletCase):
|
|
|
|
|
|
def test_pool(self):
|
|
|
with mock_module(*eventlet_modules):
|
|
@@ -100,7 +103,7 @@ class test_TasKPool(Case):
|
|
|
self.assertTrue(base.apply_target.called)
|
|
|
|
|
|
|
|
|
-class test_Timer(Case):
|
|
|
+class test_Timer(EventletCase):
|
|
|
|
|
|
def test_timer(self):
|
|
|
x = Timer()
|