|
@@ -7,6 +7,7 @@ import sys
|
|
|
from nose import SkipTest
|
|
|
from mock import patch, Mock
|
|
|
|
|
|
+from celery.app.defaults import is_pypy
|
|
|
from celery.concurrency.eventlet import (
|
|
|
apply_target,
|
|
|
Schedule,
|
|
@@ -21,6 +22,8 @@ class EventletCase(Case):
|
|
|
|
|
|
@skip_if_pypy
|
|
|
def setUp(self):
|
|
|
+ if is_pypy:
|
|
|
+ raise SkipTest('mock_modules not working on PyPy1.9')
|
|
|
try:
|
|
|
self.eventlet = __import__('eventlet')
|
|
|
except ImportError:
|
|
@@ -54,7 +57,7 @@ eventlet_modules = (
|
|
|
)
|
|
|
|
|
|
|
|
|
-class test_Schedule(Case):
|
|
|
+class test_Schedule(EventletCase):
|
|
|
|
|
|
def test_sched(self):
|
|
|
with mock_module(*eventlet_modules):
|
|
@@ -80,7 +83,7 @@ class test_Schedule(Case):
|
|
|
x.clear()
|
|
|
|
|
|
|
|
|
-class test_TasKPool(Case):
|
|
|
+class test_TaskPool(EventletCase):
|
|
|
|
|
|
def test_pool(self):
|
|
|
with mock_module(*eventlet_modules):
|
|
@@ -101,7 +104,7 @@ class test_TasKPool(Case):
|
|
|
self.assertTrue(base.apply_target.called)
|
|
|
|
|
|
|
|
|
-class test_Timer(Case):
|
|
|
+class test_Timer(EventletCase):
|
|
|
|
|
|
def test_timer(self):
|
|
|
x = Timer()
|