|
@@ -18,7 +18,7 @@ from celery.worker.autoreload import (
|
|
|
Autoreloader,
|
|
|
)
|
|
|
|
|
|
-from celery.tests.case import AppCase, Case, Mock, patch, mock_open
|
|
|
+from celery.tests.case import AppCase, Case, Mock, SkipTest, patch, mock_open
|
|
|
|
|
|
|
|
|
class test_WorkerComponent(AppCase):
|
|
@@ -34,7 +34,8 @@ class test_WorkerComponent(AppCase):
|
|
|
|
|
|
@patch('select.kevent', create=True)
|
|
|
@patch('select.kqueue', create=True)
|
|
|
- def test_create_ev(self, kqueue, kevent):
|
|
|
+ @patch('kombu.utils.eventio.kqueue')
|
|
|
+ def test_create_ev(self, kq, kqueue, kevent):
|
|
|
w = Mock()
|
|
|
w.use_eventloop = True
|
|
|
x = WorkerComponent(w)
|
|
@@ -122,6 +123,9 @@ class test_KQueueMonitor(Case):
|
|
|
x.stop()
|
|
|
|
|
|
def test_register_with_event_loop(self):
|
|
|
+ from kombu.utils import eventio
|
|
|
+ if eventio.kqueue is None:
|
|
|
+ raise SkipTest('version of kombu does not work with pypy')
|
|
|
x = KQueueMonitor(['a', 'b'])
|
|
|
hub = Mock(name='hub')
|
|
|
x.add_events = Mock(name='add_events()')
|
|
@@ -217,7 +221,8 @@ class test_InotifyMonitor(Case):
|
|
|
class test_default_implementation(Case):
|
|
|
|
|
|
@patch('select.kqueue', create=True)
|
|
|
- def test_kqueue(self, kqueue):
|
|
|
+ @patch('kombu.utils.eventio.kqueue', create=True)
|
|
|
+ def test_kqueue(self, kq, kqueue):
|
|
|
self.assertEqual(default_implementation(), 'kqueue')
|
|
|
|
|
|
@patch('celery.worker.autoreload.pyinotify')
|