Browse Source

bugfix. fix _schedule_bucket_request bug. (#4854)

* bugfix. (#4853)

* bugfix. (#4853)

* add unittest
ideascf 6 năm trước cách đây
mục cha
commit
d3c1f65f52
2 tập tin đã thay đổi với 72 bổ sung42 xóa
  1. 27 32
      celery/worker/consumer/consumer.py
  2. 45 10
      t/unit/worker/test_consumer.py

+ 27 - 32
celery/worker/consumer/consumer.py

@@ -269,43 +269,38 @@ class Consumer(object):
         task_reserved(request)
         self.on_task_request(request)
 
-    def _on_bucket_wakeup(self, bucket, tokens):
-        try:
-            request = bucket.pop()
-        except IndexError:
-            pass
-        else:
-            self._limit_move_to_pool(request)
-            self._schedule_oldest_bucket_request(bucket, tokens)
-
-    def _schedule_oldest_bucket_request(self, bucket, tokens):
-        try:
-            request = bucket.pop()
-        except IndexError:
-            pass
-        else:
-            return self._schedule_bucket_request(request, bucket, tokens)
-
-    def _schedule_bucket_request(self, request, bucket, tokens):
-        bucket.can_consume(tokens)
-        bucket.add(request)
-        pri = self._limit_order = (self._limit_order + 1) % 10
-        hold = bucket.expected_time(tokens)
-        self.timer.call_after(
-            hold, self._on_bucket_wakeup, (bucket, tokens),
-            priority=pri,
-        )
+    def _schedule_bucket_request(self, bucket):
+        while True:
+            try:
+                request, tokens = bucket.pop()
+            except IndexError:
+                # no request, break
+                break
+
+            if bucket.can_consume(tokens):
+                self._limit_move_to_pool(request)
+                continue
+            else:
+                # requeue to head, keep the order.
+                bucket.contents.appendleft((request, tokens))
+
+                pri = self._limit_order = (self._limit_order + 1) % 10
+                hold = bucket.expected_time(tokens)
+                self.timer.call_after(
+                    hold, self._schedule_bucket_request, (bucket,),
+                    priority=pri,
+                )
+                # no tokens, break
+                break
 
     def _limit_task(self, request, bucket, tokens):
-        if bucket.contents:
-            return bucket.add(request)
-        return self._schedule_bucket_request(request, bucket, tokens)
+        bucket.add((request, tokens))
+        return self._schedule_bucket_request(bucket)
 
     def _limit_post_eta(self, request, bucket, tokens):
         self.qos.decrement_eventually()
-        if bucket.contents:
-            return bucket.add(request)
-        return self._schedule_bucket_request(request, bucket, tokens)
+        bucket.add((request, tokens))
+        return self._schedule_bucket_request(bucket)
 
     def start(self):
         blueprint = self.blueprint

+ 45 - 10
t/unit/worker/test_consumer.py

@@ -103,34 +103,69 @@ class test_Consumer:
         c.on_send_event_buffered()
         c.hub._ready.add.assert_called_with(c._flush_events)
 
-    def test_limit_task(self):
+    def test_schedule_bucket_request(self):
         c = self.get_consumer()
         c.timer = Mock()
 
         bucket = Mock()
         request = Mock()
+        bucket.pop = lambda: bucket.contents.popleft()
         bucket.can_consume.return_value = True
         bucket.contents = deque()
 
-        c._limit_task(request, bucket, 3)
-        bucket.can_consume.assert_called_with(3)
-        bucket.expected_time.assert_called_with(3)
-        c.timer.call_after.assert_called_with(
-            bucket.expected_time(), c._on_bucket_wakeup, (bucket, 3),
-            priority=c._limit_order,
-        )
+        with patch(
+            'celery.worker.consumer.consumer.Consumer._limit_move_to_pool'
+        ) as reserv:
+            bucket.contents.append((request, 3))
+            c._schedule_bucket_request(bucket)
+            bucket.can_consume.assert_called_with(3)
+            reserv.assert_called_with(request)
 
         bucket.can_consume.return_value = False
+        bucket.contents = deque()
         bucket.expected_time.return_value = 3.33
+        bucket.contents.append((request, 4))
         limit_order = c._limit_order
-        c._limit_task(request, bucket, 4)
+        c._schedule_bucket_request(bucket)
         assert c._limit_order == limit_order + 1
         bucket.can_consume.assert_called_with(4)
         c.timer.call_after.assert_called_with(
-            3.33, c._on_bucket_wakeup, (bucket, 4),
+            3.33, c._schedule_bucket_request, (bucket,),
             priority=c._limit_order,
         )
         bucket.expected_time.assert_called_with(4)
+        assert bucket.pop() == (request, 4)
+
+        bucket.contents = deque()
+        bucket.can_consume.reset_mock()
+        c._schedule_bucket_request(bucket)
+        bucket.can_consume.assert_not_called()
+
+    def test_limit_task(self):
+        c = self.get_consumer()
+        bucket = Mock()
+        request = Mock()
+
+        with patch(
+            'celery.worker.consumer.consumer.Consumer._schedule_bucket_request'
+        ) as reserv:
+            c._limit_task(request, bucket, 1)
+            bucket.add.assert_called_with((request, 1))
+            reserv.assert_called_with(bucket)
+
+    def test_post_eta(self):
+        c = self.get_consumer()
+        c.qos = Mock()
+        bucket = Mock()
+        request = Mock()
+
+        with patch(
+            'celery.worker.consumer.consumer.Consumer._schedule_bucket_request'
+        ) as reserv:
+            c._limit_post_eta(request, bucket, 1)
+            c.qos.decrement_eventually.assert_called_with()
+            bucket.add.assert_called_with((request, 1))
+            reserv.assert_called_with(bucket)
 
     def test_start_blueprint_raises_EMFILE(self):
         c = self.get_consumer()