Просмотр исходного кода

100% coverage for celery.worker.components

Ask Solem 12 лет назад
Родитель
Сommit
0b2850a82f
1 измененных файлов с 40 добавлено и 0 удалено
  1. 40 0
      celery/tests/worker/test_components.py

+ 40 - 0
celery/tests/worker/test_components.py

@@ -0,0 +1,40 @@
+from __future__ import absolute_import
+
+# some of these are tested in test_worker, so I've only written tests
+# here to complete coverage.  Should move everyting to this module at some
+# point [-ask]
+
+from mock import Mock
+
+from celery.worker.components import (
+    Queues,
+    Pool,
+)
+
+from celery.tests.utils import AppCase
+
+
+class test_Queues(AppCase):
+
+    def test_create_when_eventloop(self):
+        w = Mock()
+        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
+        q = Queues(w)
+        q.create(w)
+        self.assertIs(w.process_task, w._process_task_sem)
+
+
+class test_Pool(AppCase):
+
+    def test_close_terminate(self):
+        w = Mock()
+        comp = Pool(w)
+        pool = w.pool = Mock()
+        comp.close(w)
+        pool.close.assert_called_with()
+        comp.terminate(w)
+        pool.terminate.assert_called_with()
+
+        w.pool = None
+        comp.close(w)
+        comp.terminate(w)