Jelajahi Sumber

Refactored some of the isintance mess with TokenBucket Queue vs TokenBucketQueue.

Ask Solem 15 tahun lalu
induk
melakukan
8e3c23408a

+ 5 - 4
celery/tests/test_worker.py

@@ -1,5 +1,5 @@
 import unittest2 as unittest
-from Queue import Queue, Empty
+from Queue import Empty
 from datetime import datetime, timedelta
 from multiprocessing import get_logger
 
@@ -10,8 +10,9 @@ from billiard.serialization import pickle
 from celery import conf
 from celery.utils import gen_unique_id
 from celery.worker import WorkController
-from celery.worker.listener import CarrotListener, RUN
 from celery.worker.job import TaskWrapper
+from celery.worker.buckets import FastQueue
+from celery.worker.listener import CarrotListener, RUN
 from celery.worker.scheduler import Scheduler
 from celery.decorators import task as task_dec
 from celery.decorators import periodic_task as periodic_task_dec
@@ -125,7 +126,7 @@ def create_message(backend, **data):
 class TestCarrotListener(unittest.TestCase):
 
     def setUp(self):
-        self.ready_queue = Queue()
+        self.ready_queue = FastQueue()
         self.eta_schedule = Scheduler(self.ready_queue)
         self.logger = get_logger()
         self.logger.setLevel(0)
@@ -266,7 +267,7 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(found)
 
     def test_revoke(self):
-        ready_queue = Queue()
+        ready_queue = FastQueue()
         l = CarrotListener(ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()

+ 2 - 2
celery/worker/__init__.py

@@ -17,7 +17,7 @@ from celery.log import setup_logger, _hijack_multiprocessing_logger
 from celery.beat import EmbeddedClockService
 from celery.utils import noop, instantiate
 
-from celery.worker.buckets import TaskBucket
+from celery.worker.buckets import TaskBucket, FastQueue
 from celery.worker.scheduler import Scheduler
 
 
@@ -129,7 +129,7 @@ class WorkController(object):
 
         # Queues
         if conf.DISABLE_RATE_LIMITS:
-            self.ready_queue = Queue()
+            self.ready_queue = FastQueue()
         else:
             self.ready_queue = TaskBucket(task_registry=registry.tasks)
         self.eta_schedule = Scheduler(self.ready_queue, logger=self.logger)

+ 18 - 3
celery/worker/buckets.py

@@ -157,12 +157,10 @@ class TaskBucket(object):
         if task_name in self.buckets:
             task_queue = self._get_queue_for_type(task_name)
         else:
-            task_queue = task_type.rate_limit_queue_type()
+            task_queue = FastQueue()
 
         if rate_limit:
             task_queue = TokenBucketQueue(rate_limit, queue=task_queue)
-        else:
-            task_queue.expected_time = lambda: 0
 
         self.buckets[task_name] = task_queue
         return task_queue
@@ -196,6 +194,23 @@ class TaskBucket(object):
                 bucket.queue.clear()
 
 
+class FastQueue(Queue):
+    """:class:`Queue.Queue` supporting the interface of
+    :class:`TokenBucketQueue`."""
+
+    def clear(self):
+        return self.queue.clear()
+
+    def expected_time(self, tokens=1):
+        return 0
+
+    def can_consume(self, tokens=1):
+        return True
+
+    def wait(self, block=True):
+        return self.get(block=block)
+
+
 class TokenBucketQueue(object):
     """Queue with rate limited get operations.
 

+ 12 - 5
celery/worker/control/builtins.py

@@ -1,7 +1,9 @@
 from datetime import datetime
 
+from celery import conf
 from celery.registry import tasks
 from celery.worker.revoke import revoked
+from celery.worker.buckets import TaskBucket
 from celery.worker.control.registry import Panel
 
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
@@ -30,17 +32,22 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
     except KeyError:
         panel.logger.error("Rate limit attempt for unknown task %s" % (
             task_name, ))
-        return "unknown task"
+        return {"error": "unknown task"}
+
+    if conf.DISABLE_RATE_LIMITS:
+        panel.logger.error("Rate limit attempt, but rate limits disabled.")
+        return {"error": "rate limits disabled"}
 
     panel.listener.ready_queue.refresh()
 
     if not rate_limit:
         panel.logger.warn("Disabled rate limits for tasks of type %s" % (
                             task_name, ))
-    else:
-        panel.logger.warn("New rate limit for tasks of type %s: %s." % (
-                    task_name, rate_limit))
-    return True
+        return {"ok": "rate limit disabled successfully"}
+
+    panel.logger.warn("New rate limit for tasks of type %s: %s." % (
+                task_name, rate_limit))
+    return {"ok": "new rate limit set successfully"}
 
 
 @Panel.register

+ 2 - 6
celery/worker/listener.py

@@ -203,12 +203,8 @@ class CarrotListener(object):
                 "CarrotListener: Re-establishing connection to the broker...")
         self.stop_consumers()
 
-        try:
-            # TaskBucket supports clear directly.
-            self.ready_queue.clear()
-        except AttributeError:
-            # Use the underlying deque of regular Queue
-            self.ready_queue.queue.clear()
+        # Clear internal queues.
+        self.ready_queue.clear()
         self.eta_schedule.clear()
 
         self.connection = self._open_connection()