Sfoglia il codice sorgente

Broadcast control commands now handled in a common place + added the ability to set rate limits remotely.

Ask Solem 15 anni fa
parent
commit
4fc94c2239
2 ha cambiato i file con 49 aggiunte e 6 eliminazioni
  1. 44 0
      celery/worker/control.py
  2. 5 6
      celery/worker/listener.py

+ 44 - 0
celery/worker/control.py

@@ -0,0 +1,44 @@
+from celery.worker.revoke import revoked
+from celery.registry import tasks
+
+
+class Control(object):
+
+    def __init__(self, logger):
+        self.logger = logger
+
+    def revoke(self, task_id, **kwargs):
+        revoked.add(task_id)
+        self.logger.warn("Task %s revoked." % task_id)
+
+    def rate_limit(self, task_name, rate_limit):
+        try:
+            tasks[task_name].rate_limit = rate_limit
+        except KeyError:
+            return
+
+        if not rate_limit:
+            self.logger.warn("Disabled rate limits for tasks of type %s" % (
+                                task_name))
+        else:
+            self.logger.warn("New rate limit for tasks of type %s: %s." % (
+                                task_name, rate_limit))
+
+
+class ControlDispatch(object):
+    panel_cls = Control
+
+    def __init__(self, logger):
+        self.logger = logger
+        self.panel = self.panel_cls(self.logger)
+
+    def dispatch(self, command, kwargs):
+        try:
+            control = getattr(self.panel, command)
+        except AttributeError:
+            self.logger.error("No such control command: %s" % command)
+        else:
+            return control(**kwargs)
+
+
+

+ 5 - 6
celery/worker/listener.py

@@ -9,6 +9,7 @@ from celery import signals
 from celery.utils import retry_over_time
 from celery.worker.job import TaskWrapper
 from celery.worker.revoke import revoked
+from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
 from celery.events import EventDispatcher
 from celery.messaging import get_consumer_set, BroadcastConsumer
@@ -49,6 +50,7 @@ class CarrotListener(object):
         self.eta_scheduler = eta_scheduler
         self.send_events = send_events
         self.logger = logger
+        self.control_dispatch = ControlDispatch(logger=logger)
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.event_dispatcher = None
         self.heart = None
@@ -89,12 +91,9 @@ class CarrotListener(object):
 
             wait_for_message()
 
-    def on_control_command(self, command):
-        if command["command"] == "revoke":
-            revoke_uuid = command["task_id"]
-            revoked.add(revoke_uuid)
-            self.logger.warn("Task %s marked as revoked." % revoke_uuid)
-            return
+    def on_control_command(self, message):
+        command = message.pop("command")
+        return self.control_dispatch.dispatch(command, message)
 
     def on_task(self, task, eta=None):
         """Handle received task.