Jelajahi Sumber

Tests passing

Ask Solem 11 tahun lalu
induk
melakukan
43e0335521

+ 5 - 4
celery/tests/bin/test_worker.py

@@ -26,6 +26,7 @@ from celery.tests.case import (
     WhateverIO,
     skip_if_pypy,
     skip_if_jython,
+    mock_module,
 )
 
 ensure_process_aware_logger()
@@ -257,13 +258,13 @@ class test_Worker(WorkerAppCase):
         self.assert_no_logging_side_effect()
 
     def test_include_argument(self):
-        worker1 = self.Worker(app=self.app, include='some.module')
-        self.assertListEqual(worker1.include, ['some.module'])
+        worker1 = self.Worker(app=self.app, include='os')
+        self.assertListEqual(worker1.include, ['os'])
         worker2 = self.Worker(app=self.app,
-                              include='some.module,another.package')
+                            include='os,sys')
         self.assertListEqual(
             worker2.include,
-            ['some.module', 'another.package'],
+            ['os', 'sys'],
         )
         self.Worker(app=self.app, include=['os', 'sys'])
 

+ 18 - 8
celery/tests/worker/test_autoscale.py

@@ -85,7 +85,8 @@ class test_Autoscaler(AppCase):
             def join(self, timeout=None):
                 self.joined = True
 
-        x = Scaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = Scaler(self.pool, 10, 3, worker=worker)
         x._is_stopped.set()
         x.stop()
         self.assertTrue(x.joined)
@@ -96,7 +97,8 @@ class test_Autoscaler(AppCase):
 
     @sleepdeprived(autoscale)
     def test_body(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         x.body()
         self.assertEqual(x.pool.num_processes, 3)
         for i in range(20):
@@ -104,12 +106,14 @@ class test_Autoscaler(AppCase):
         x.body()
         x.body()
         self.assertEqual(x.pool.num_processes, 10)
+        self.assertTrue(worker.consumer.increment_prefetch_count.called)
         state.reserved_requests.clear()
         x.body()
         self.assertEqual(x.pool.num_processes, 10)
         x._last_action = time() - 10000
         x.body()
         self.assertEqual(x.pool.num_processes, 3)
+        self.assertTrue(worker.consumer.decrement_prefetch_count.called)
 
     def test_run(self):
 
@@ -120,14 +124,16 @@ class test_Autoscaler(AppCase):
                 self.scale_called = True
                 self._is_shutdown.set()
 
-        x = Scaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = Scaler(self.pool, 10, 3, worker=worker)
         x.run()
         self.assertTrue(x._is_shutdown.isSet())
         self.assertTrue(x._is_stopped.isSet())
         self.assertTrue(x.scale_called)
 
     def test_shrink_raises_exception(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         x.scale_up(3)
         x._last_action = time() - 10000
         x.pool.shrink_raises_exception = True
@@ -135,7 +141,8 @@ class test_Autoscaler(AppCase):
 
     @patch('celery.worker.autoscale.debug')
     def test_shrink_raises_ValueError(self, debug):
-        x = autoscale.Autoscaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         x.scale_up(3)
         x._last_action = time() - 10000
         x.pool.shrink_raises_ValueError = True
@@ -143,7 +150,8 @@ class test_Autoscaler(AppCase):
         self.assertTrue(debug.call_count)
 
     def test_update_and_force(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         self.assertEqual(x.processes, 3)
         x.force_scale_up(5)
         self.assertEqual(x.processes, 8)
@@ -165,7 +173,8 @@ class test_Autoscaler(AppCase):
         x.update(max=None, min=None)
 
     def test_info(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         info = x.info()
         self.assertEqual(info['max'], 10)
         self.assertEqual(info['min'], 3)
@@ -179,7 +188,8 @@ class test_Autoscaler(AppCase):
             def body(self):
                 self._is_shutdown.set()
                 raise OSError('foo')
-        x = _Autoscaler(self.pool, 10, 3)
+        worker = Mock(name='worker')
+        x = _Autoscaler(self.pool, 10, 3, worker=worker)
 
         stderr = Mock()
         p, sys.stderr = sys.stderr, stderr

+ 0 - 1
celery/tests/worker/test_consumer.py

@@ -182,7 +182,6 @@ class test_Tasks(AppCase):
         tasks = Tasks(c)
         self.assertIsNone(c.task_consumer)
         self.assertIsNone(c.qos)
-        self.assertEqual(c.initial_prefetch_count, 2)
 
         c.task_consumer = Mock()
         tasks.stop(c)

+ 8 - 0
celery/tests/worker/test_control.py

@@ -44,6 +44,8 @@ class Consumer(consumer.Consumer):
         self.event_dispatcher = Mock()
         self.controller = WorkController()
         self.task_consumer = Mock()
+        self.prefetch_multiplier = 1
+        self.initial_prefetch_count = 1
 
         from celery.concurrency.base import BasePool
         self.pool = BasePool(10)
@@ -270,13 +272,19 @@ class test_ControlPanel(AppCase):
                 self.size -= n
 
         consumer = Consumer(self.app)
+        consumer.prefetch_multiplier = 8
+        consumer.qos = Mock(name='qos')
         consumer.pool = MockPool()
         panel = self.create_panel(consumer=consumer)
 
         panel.handle('pool_grow')
         self.assertEqual(consumer.pool.size, 2)
+        consumer.qos.increment_eventually.assert_called_with(8)
+        self.assertEqual(consumer.initial_prefetch_count, 9)
         panel.handle('pool_shrink')
         self.assertEqual(consumer.pool.size, 1)
+        consumer.qos.decrement_eventually.assert_called_with(8)
+        self.assertEqual(consumer.initial_prefetch_count, 1)
 
         panel.state.consumer = Mock()
         panel.state.consumer.controller = Mock()

+ 5 - 3
celery/tests/worker/test_loops.py

@@ -140,16 +140,18 @@ class test_asynloop(AppCase):
         _, on_task, body, msg, strategy = self.task_context(self.add.s(2, 2))
         on_task(body, msg)
         strategy.assert_called_with(
-            msg, body, msg.ack_log_error, msg.reject_log_error,
+            msg, body, msg.ack_log_error, msg.reject_log_error, [],
         )
 
     def test_on_task_received_executes_on_task_message(self):
         cbs = [Mock(), Mock(), Mock()]
-        _, on_task, body, msg, _ = self.task_context(
+        _, on_task, body, msg, strategy = self.task_context(
             self.add.s(2, 2), on_task_message=cbs,
         )
         on_task(body, msg)
-        [cb.assert_called_with() for cb in cbs]
+        strategy.assert_called_with(
+            msg, body, msg.ack_log_error, msg.reject_log_error, cbs,
+        )
 
     def test_on_task_message_missing_name(self):
         x, on_task, body, msg, strategy = self.task_context(self.add.s(2, 2))

+ 4 - 4
celery/tests/worker/test_request.py

@@ -392,7 +392,7 @@ class test_Request(AppCase):
             self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
         )
         with assert_signal_called(
-                task_revoked, sender=job.task,
+                task_revoked, sender=job.task, request=job,
                 terminated=True, expired=False, signum=signum):
             job.time_start = time.time()
             job.worker_pid = 313
@@ -416,7 +416,7 @@ class test_Request(AppCase):
             expires=datetime.utcnow() - timedelta(days=1),
         )
         with assert_signal_called(
-                task_revoked, sender=job.task,
+                task_revoked, sender=job.task, request=job,
                 terminated=False, expired=True, signum=None):
             job.revoked()
             self.assertIn(job.id, revoked)
@@ -496,7 +496,7 @@ class test_Request(AppCase):
             self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
         )
         with assert_signal_called(
-                task_revoked, sender=job.task,
+                task_revoked, sender=job.task, request=job,
                 terminated=False, expired=False, signum=None):
             revoked.add(job.id)
             self.assertTrue(job.revoked())
@@ -552,7 +552,7 @@ class test_Request(AppCase):
             self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
         )
         with assert_signal_called(
-                task_revoked, sender=job.task,
+                task_revoked, sender=job.task, request=job,
                 terminated=True, expired=False, signum=signum):
             job.terminate(pool, signal='KILL')
             self.assertFalse(pool.terminate_job.call_count)

+ 1 - 1
celery/tests/worker/test_strategy.py

@@ -34,7 +34,7 @@ class test_default_strategy(AppCase):
         def __call__(self, **kwargs):
             return self.s(
                 self.message, self.body,
-                self.message.ack, self.message.reject, **kwargs
+                self.message.ack, self.message.reject, [], **kwargs
             )
 
         def was_reserved(self):