Explorar o código

tests: Now mocks the Pool

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
218dd421b7
Modificáronse 4 ficheiros con 37 adicións e 21 borrados
  1. 4 0
      celery/states.py
  2. 0 7
      celery/tests/test_pool.py
  3. 23 5
      celery/tests/test_worker_job.py
  4. 10 9
      celery/worker/pool.py

+ 4 - 0
celery/states.py

@@ -46,6 +46,10 @@ RETRY = "RETRY"
 
 
     Set of states meaning the task returned an exception.
     Set of states meaning the task returned an exception.
 
 
+.. data:: PROPAGATE_STATES
+
+    Set of exception states that should propagate exceptions to the user.
+
 .. data:: ALL_STATES
 .. data:: ALL_STATES
 
 
     Set of all possible states.
     Set of all possible states.

+ 0 - 7
celery/tests/test_pool.py

@@ -30,13 +30,6 @@ class TestTaskPool(unittest.TestCase):
         self.assertIsInstance(p.logger, logging.Logger)
         self.assertIsInstance(p.logger, logging.Logger)
         self.assertIsNone(p._pool)
         self.assertIsNone(p._pool)
 
 
-    def test_start_stop(self):
-        p = TaskPool(limit=2)
-        p.start()
-        self.assertIsNotNone(p._pool)
-        p.stop()
-        self.assertIsNone(p._pool)
-
     def x_apply(self):
     def x_apply(self):
         p = TaskPool(limit=2)
         p = TaskPool(limit=2)
         p.start()
         p.start()

+ 23 - 5
celery/tests/test_worker_job.py

@@ -300,11 +300,29 @@ class TestTaskWrapper(unittest.TestCase):
     def test_execute_using_pool(self):
     def test_execute_using_pool(self):
         tid = gen_unique_id()
         tid = gen_unique_id()
         tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
         tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
-        p = TaskPool(2)
-        p.start()
-        asyncres = tw.execute_using_pool(p)
-        self.assertEqual(asyncres.get(), 256)
-        p.stop()
+
+        class MockPool(object):
+            target = None
+            args = None
+            kwargs = None
+
+            def __init__(self, *args, **kwargs):
+                pass
+
+            def apply_async(self, target, args=None, kwargs=None,
+                    *margs, **mkwargs):
+                self.target = target
+                self.args = args
+                self.kwargs = kwargs
+
+        p = MockPool()
+        tw.execute_using_pool(p)
+        self.assertTrue(p.target)
+        self.assertEqual(p.args[0], mytask.name)
+        self.assertEqual(p.args[1], tid)
+        self.assertEqual(p.args[2], [4])
+        self.assertIn("f", p.args[3])
+        self.assertIn([4], p.args)
 
 
     def test_default_kwargs(self):
     def test_default_kwargs(self):
         tid = gen_unique_id()
         tid = gen_unique_id()

+ 10 - 9
celery/worker/pool.py

@@ -3,7 +3,7 @@
 Process Pools.
 Process Pools.
 
 
 """
 """
-from billiard.pool import Pool
+from billiard.pool import Pool, RUN
 from billiard.utils.functional import curry
 from billiard.utils.functional import curry
 
 
 from celery import log
 from celery import log
@@ -43,17 +43,18 @@ class TaskPool(object):
         Will pre-fork all workers so they're ready to accept tasks.
         Will pre-fork all workers so they're ready to accept tasks.
 
 
         """
         """
-        self._pool = DynamicPool(processes=self.limit,
-                                 initializer=self.initializer,
-                                 timeout=self.timeout,
-                                 soft_timeout=self.soft_timeout,
-                                 maxtasksperchild=self.maxtasksperchild)
+        self._pool = Pool(processes=self.limit,
+                          initializer=self.initializer,
+                          timeout=self.timeout,
+                          soft_timeout=self.soft_timeout,
+                          maxtasksperchild=self.maxtasksperchild)
 
 
     def stop(self):
     def stop(self):
         """Terminate the pool."""
         """Terminate the pool."""
-        self._pool.close()
-        self._pool.join()
-        self._pool = None
+        if self._pool is not None and self._pool._state == RUN:
+            self._pool.close()
+            self._pool.join()
+            self._pool = None
 
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, accept_callback=None, timeout_callback=None,
             errbacks=None, accept_callback=None, timeout_callback=None,