|
@@ -182,6 +182,7 @@ class Consumer(object):
|
|
|
self._restart_state = restart_state(maxR=5, maxT=1)
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
|
+ self._limit_order = 0
|
|
|
self.on_task_request = on_task_request
|
|
|
self.on_task_message = set()
|
|
|
self.amqheartbeat_rate = self.app.conf.BROKER_HEARTBEAT_CHECKRATE
|
|
@@ -252,11 +253,17 @@ class Consumer(object):
|
|
|
else self.qos.increment_eventually)(
|
|
|
abs(index) * self.prefetch_multiplier)
|
|
|
|
|
|
+ def _limit_move_to_pool(self, request):
|
|
|
+ task_reserved(request)
|
|
|
+ self.on_task_request(request)
|
|
|
+
|
|
|
def _limit_task(self, request, bucket, tokens):
|
|
|
if not bucket.can_consume(tokens):
|
|
|
hold = bucket.expected_time(tokens)
|
|
|
+ pri = self._limit_order = (self._limit_order + 1) % 10
|
|
|
self.timer.call_after(
|
|
|
- hold, self._limit_task, (request, bucket, tokens),
|
|
|
+ hold, self._limit_move_to_pool, (request, ),
|
|
|
+ priority=pri,
|
|
|
)
|
|
|
else:
|
|
|
task_reserved(request)
|