Преглед на файлове

Tests for process pool restart

Mher Movsisyan преди 13 години
родител
ревизия
38fe28a292
променени са 2 файла, в които са добавени 19 реда и са изтрити 0 реда
  1. 6 0
      celery/tests/test_concurrency/__init__.py
  2. 13 0
      celery/tests/test_concurrency/test_concurrency_processes.py

+ 6 - 0
celery/tests/test_concurrency/__init__.py

@@ -1,4 +1,5 @@
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import os
 
@@ -63,3 +64,8 @@ class test_BasePool(unittest.TestCase):
         self.assertFalse(p.active)
         p._state = p.RUN
         self.assertTrue(p.active)
+
+    def test_restart(self):
+        p = BasePool(10)
+        with self.assertRaises(NotImplementedError):
+            p.restart()

+ 13 - 0
celery/tests/test_concurrency/test_concurrency_processes.py

@@ -3,6 +3,7 @@ from __future__ import with_statement
 
 import signal
 import sys
+import time
 
 from itertools import cycle
 
@@ -226,3 +227,15 @@ class test_TaskPool(unittest.TestCase):
         self.assertEqual(info["max-concurrency"], pool.limit)
         self.assertIsNone(info["max-tasks-per-child"])
         self.assertEqual(info["timeouts"], (5, 10))
+
+    def test_restart(self):
+        def get_pids(pool):
+            return set([p.pid for p in pool._pool._pool])
+
+        tp = self.TaskPool(5)
+        time.sleep(0.5)
+        tp.start()
+        pids = get_pids(tp)
+        tp.restart()
+        time.sleep(0.5)
+        self.assertEqual(pids, get_pids(tp))