Przeglądaj źródła

Good-bye TokenBucketQueue

Ask Solem 12 lat temu
rodzic
commit
f104349824

+ 0 - 1
celery/worker/__init__.py

@@ -98,7 +98,6 @@ class WorkController(configurated):
             'celery.worker.components:Consumer',
             'celery.worker.autoscale:WorkerComponent',
             'celery.worker.autoreload:WorkerComponent',
-            'celery.worker.mediator:WorkerComponent',
 
         ])
 

+ 1 - 0
celery/worker/components.py

@@ -262,5 +262,6 @@ class Consumer(bootsteps.StartStopStep):
             controller=w,
             hub=w.hub,
             worker_options=w.options,
+            disable_rate_limits=w.disable_rate_limits,
         )
         return c

+ 31 - 2
celery/worker/consumer.py

@@ -25,6 +25,7 @@ from billiard.exceptions import RestartFreqExceeded
 from kombu.common import QoS, ignore_errors
 from kombu.syn import _detect_environment
 from kombu.utils.encoding import safe_repr
+from kombu.utils.limits import TokenBucket
 
 from celery import bootsteps
 from celery.app import app_or_default
@@ -35,7 +36,7 @@ from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.text import truncate
 from celery.utils.timer2 import default_timer, to_timestamp
-from celery.utils.timeutils import humanize_seconds, timezone
+from celery.utils.timeutils import humanize_seconds, timezone, rate
 
 from . import heartbeat, loops, pidbox
 from .state import task_reserved, maybe_shutdown, revoked
@@ -143,7 +144,7 @@ class Consumer(object):
                  init_callback=noop, hostname=None,
                  pool=None, app=None,
                  timer=None, controller=None, hub=None, amqheartbeat=None,
-                 worker_options=None, **kwargs):
+                 worker_options=None, disable_rate_limits=False, **kwargs):
         self.app = app_or_default(app)
         self.controller = controller
         self.ready_queue = ready_queue
@@ -160,6 +161,12 @@ class Consumer(object):
         self._does_info = logger.isEnabledFor(logging.INFO)
         self._quick_put = self.ready_queue.put
         self.amqheartbeat_rate = self.app.conf.BROKER_HEARTBEAT_CHECKRATE
+        self.disable_rate_limits = disable_rate_limits
+
+        # this contains a tokenbucket for each task type by name, used for
+        # rate limits, or None if rate limits are disabled for that task.
+        self.task_buckets = defaultdict(lambda: None)
+        self.reset_rate_limits()
 
         if hub:
             self.amqheartbeat = amqheartbeat
@@ -186,6 +193,24 @@ class Consumer(object):
         )
         self.namespace.apply(self, **dict(worker_options or {}, **kwargs))
 
+    def bucket_for_task(self, type):
+        limit = rate(getattr(type, 'rate_limit', None))
+        return TokenBucket(limit, capacity=1) if limit else None
+
+    def reset_rate_limits(self):
+        self.task_buckets.update(
+            (n, self.bucket_for_task(t)) for n, t in items(self.app._tasks)
+        )
+
+    def _limit_task(self, request, bucket, tokens):
+        if not bucket.can_consume(tokens):
+            hold = bucket.expected_time(tokens)
+            self.timer.apply_after(
+                hold * 1000.0, self._limit_task, (request, bucket, tokens),
+            )
+        else:
+            self._quick_put(request)
+
     def start(self):
         ns, loop = self.namespace, self.loop
         while ns.state != CLOSE:
@@ -345,6 +370,10 @@ class Consumer(object):
                 )
         else:
             task_reserved(task)
+            if not self.disable_rate_limits:
+                bucket = self.task_buckets[task.name]
+                if bucket:
+                    self._limit_task(task, bucket, 1)
             self._quick_put(task)
 
     def apply_eta_task(self, task):

+ 1 - 5
celery/worker/control.py

@@ -106,11 +106,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
                      task_name, exc_info=True)
         return {'error': 'unknown task'}
 
-    if not hasattr(panel.consumer.ready_queue, 'refresh'):
-        logger.error('Rate limit attempt, but rate limits disabled.')
-        return {'error': 'rate limits disabled'}
-
-    panel.consumer.ready_queue.refresh()
+    panel.consumer.reset_rate_limits()
 
     if not rate_limit:
         logger.info('Rate limits disabled for tasks of type %s', task_name)