浏览代码

Use the timer for rate limits when on the eventloop (amqp/redis)

Ask Solem 12 年之前
父节点
当前提交
4340630ba8
共有 3 个文件被更改,包括 57 次插入14 次删除
  1. 11 12
      celery/worker/__init__.py
  2. 45 1
      celery/worker/buckets.py
  3. 1 1
      celery/worker/mediator.py

+ 11 - 12
celery/worker/__init__.py

@@ -37,7 +37,7 @@ from celery.utils.timer2 import Schedule
 
 from . import bootsteps
 from . import state
-from .buckets import TaskBucket, FastQueue
+from .buckets import TaskBucket, AsyncTaskBucket, FastQueue
 from .hub import Hub, BoundedSemaphore
 
 #: Worker states
@@ -205,23 +205,22 @@ class Queues(bootsteps.Component):
     requires = ('ev', )
 
     def create(self, w):
+        BucketType = TaskBucket
         w.start_mediator = True
         if not w.pool_cls.rlimit_safe:
             w.disable_rate_limits = True
+        process_task = w.process_task
+        if w.use_eventloop:
+            BucketType = AsyncTaskBucket
+            if w.pool_putlocks and w.pool_cls.uses_semaphore:
+                process_task = w.process_task_sem
         if w.disable_rate_limits:
             w.ready_queue = FastQueue()
-            if w.use_eventloop:
-                w.start_mediator = False
-                if w.pool_putlocks and w.pool_cls.uses_semaphore:
-                    w.ready_queue.put = w.process_task_sem
-                else:
-                    w.ready_queue.put = w.process_task
-            elif not w.pool_cls.requires_mediator:
-                # just send task directly to pool, skip the mediator.
-                w.ready_queue.put = w.process_task
-                w.start_mediator = False
+            w.ready_queue.put = process_task
         else:
-            w.ready_queue = TaskBucket(task_registry=w.app.tasks)
+            w.ready_queue = BucketType(
+                task_registry=w.app.tasks, callback=process_task, worker=w,
+            )
 
 
 class EvLoop(bootsteps.StartStopComponent):

+ 45 - 1
celery/worker/buckets.py

@@ -31,6 +31,48 @@ class RateLimitExceeded(Exception):
     """The token buckets rate limit has been exceeded."""
 
 
+class AsyncTaskBucket(object):
+
+    def __init__(self, task_registry, callback=None, worker=None):
+        self.task_registry = task_registry
+        self.callback = callback
+        self.worker = worker
+        self.buckets = {}
+
+        for name in self.task_registry.iterkeys():
+            self.add_task_type(name)
+
+    def cont(self, request, bucket, tokens):
+        if not bucket.can_consume(tokens):
+            hold = bucket.expected_time(tokens)
+            self.worker.timer.apply_after(hold * 1000.0,
+                self.cont, (request, bucket, tokens))
+        else:
+            self.callback(request)
+
+    def put(self, request):
+        name = request.name
+        try:
+            bucket = self.buckets[name]
+        except KeyError:
+            bucket = self.add_bucket_for_type(name)
+        if not bucket:
+            return self.callback(request)
+        return self.cont(request, bucket, 1)
+
+    def add_task_type(self, name):
+        task_type = self.task_registry[name]
+        limit = getattr(task_type, 'rate_limit', None)
+        limit = timeutils.rate(limit)
+        bucket = self.buckets[name] = (
+            TokenBucket(limit, capacity=1) if limit else None
+        )
+        return bucket
+
+    def clear(self):
+        pass
+
+
 class TaskBucket(object):
     """This is a collection of token buckets, each task type having
     its own token bucket.  If the task type doesn't have a rate limit,
@@ -58,13 +100,15 @@ class TaskBucket(object):
 
     """
 
-    def __init__(self, task_registry):
+    def __init__(self, task_registry, callback=None, worker=None):
         self.task_registry = task_registry
         self.buckets = {}
         self.init_with_registry()
         self.immediate = deque()
         self.mutex = threading.Lock()
         self.not_empty = threading.Condition(self.mutex)
+        self.callback = callback
+        self.worker = worker
 
     def put(self, request):
         """Put a :class:`~celery.worker.job.Request` into

+ 1 - 1
celery/worker/mediator.py

@@ -36,7 +36,7 @@ class WorkerComponent(StartStopComponent):
         w.mediator = None
 
     def include_if(self, w):
-        return w.start_mediator
+        return w.start_mediator and not w.use_eventloop
 
     def create(self, w):
         m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,