Forráskód Böngészése

Tests for new mediator logic

Ask Solem 13 éve
szülő
commit
bc6f2f0459
1 módosított fájl, 14 hozzáadás és 3 törlés
  1. 14 3
      celery/tests/test_worker/__init__.py

+ 14 - 3
celery/tests/test_worker/__init__.py

@@ -11,6 +11,7 @@ from Queue import Empty
 from kombu.transport.base import Message
 from kombu.connection import BrokerConnection
 from mock import Mock, patch
+from nose import SkipTest
 
 from celery import current_app
 from celery.concurrency.base import BasePool
@@ -902,13 +903,23 @@ class test_WorkController(AppCase):
         finally:
             state.Persistent = Persistent
 
-    def test_disable_rate_limits(self):
-        from celery.worker.buckets import FastQueue
-        worker = self.create_worker(disable_rate_limits=True)
+    def test_disable_rate_limits_solo(self):
+        worker = self.create_worker(disable_rate_limits=True,
+                                    pool_cls="solo")
         self.assertIsInstance(worker.ready_queue, FastQueue)
         self.assertIsNone(worker.mediator)
         self.assertEqual(worker.ready_queue.put, worker.process_task)
 
+    def test_disable_rate_limits_processes(self):
+        try:
+            worker = self.create_worker(disable_rate_limits=True,
+                                        pool_cls="processes")
+        except ImportError:
+            raise SkipTest("multiprocessing not supported")
+        self.assertIsInstance(worker.ready_queue, FastQueue)
+        self.assertTrue(worker.mediator)
+        self.assertNotEqual(worker.ready_queue.put, worker.process_task)
+
     def test_start__stop(self):
         worker = self.worker
         worker._shutdown_complete.set()