Forráskód Böngészése

Added broadcast() and rate_limit() remote control utilities + remote shutdown of workers.

Ask Solem 15 éve
szülő
commit
248e77c138
3 módosított fájl, 61 hozzáadás és 10 törlés
  1. 4 6
      celery/messaging.py
  2. 51 3
      celery/task/control.py
  3. 6 1
      celery/worker/control.py

+ 4 - 6
celery/messaging.py

@@ -91,12 +91,10 @@ class BroadcastPublisher(Publisher):
     exchange_type = "fanout"
     exchange_type = "fanout"
     routing_key = ""
     routing_key = ""
 
 
-    def revoke(self, task_id):
-        self.send("revoke", dict(task_id=task_id))
-
-    def send(self, type, data):
-        data["command"] = type
-        super(BroadcastPublisher, self).send({"control": data})
+    def send(self, type, arguments, destination=None):
+        arguments["command"] = type
+        arguments["destination"] = destination
+        super(BroadcastPublisher, self).send({"control": arguments})
 
 
 
 
 class BroadcastConsumer(Consumer):
 class BroadcastConsumer(Consumer):

+ 51 - 3
celery/task/control.py

@@ -19,17 +19,65 @@ def discard_all(connection=None, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
         consumer.close()
         consumer.close()
 
 
 
 
-@with_connection
-def revoke(task_id, connection=None,
+def revoke(task_id, destination=None, connection=None,
         connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
         connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
     """Revoke a task by id.
     """Revoke a task by id.
 
 
     If a task is revoked, the workers will ignore the task and not execute
     If a task is revoked, the workers will ignore the task and not execute
     it after all.
     it after all.
 
 
+    :param task_id: Id of the task to revoke.
+    :keyword destination: If set, a list of the hosts to send the command to,
+        when empty broadcast to all workers.
+    :keyword connection: Custom broker connection to use, if not set, a connection
+        will be established automatically.
+    :keyword connect_timeout: Timeout for new connection if a custom
+        connection is not provided.
+
+    """
+    return broadcast("revoke", destination=destination,
+                               arguments={"task_id": task_id})
+
+
+def rate_limit(task_name, rate_limit, destination=None, connection=None,
+        connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
+    """Set rate limit for task by type.
+
+    :param task_name: Type of task to change rate limit for.
+    :param rate_limit: The rate limit as tasks per second, or a rate limit
+      string (``"100/m"``, etc. see :attr:`celery.task.base.Task.rate_limit`
+      for more information).
+    :keyword destination: If set, a list of the hosts to send the command to,
+        when empty broadcast to all workers.
+    :keyword connection: Custom broker connection to use, if not set, a connection
+        will be established automatically.
+    :keyword connect_timeout: Timeout for new connection if a custom
+        connection is not provided.
+
     """
     """
+    return broadcast("rate_limit", destination=destination,
+                                   arguments={"task_name": task_name,
+                                              "rate_limit": rate_limit})
+
+@with_connection
+def broadcast(command, arguments=None, destination=None, connection=None,
+        connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
+    """Broadcast a control command to the celery workers.
+
+    :param command: Name of command to send.
+    :param arguments: Keyword arguments for the command.
+    :keyword destination: If set, a list of the hosts to send the command to,
+        when empty broadcast to all workers.
+    :keyword connection: Custom broker connection to use, if not set, a connection
+        will be established automatically.
+    :keyword connect_timeout: Timeout for new connection if a custom
+        connection is not provided.
+
+    """
+    arguments = arguments or {}
+
     broadcast = BroadcastPublisher(connection)
     broadcast = BroadcastPublisher(connection)
     try:
     try:
-        broadcast.revoke(task_id)
+        broadcast.send(command, arguments, destination=destination)
     finally:
     finally:
         broadcast.close()
         broadcast.close()

+ 6 - 1
celery/worker/control.py

@@ -51,6 +51,11 @@ class Control(object):
             self.logger.warn("New rate limit for tasks of type %s: %s." % (
             self.logger.warn("New rate limit for tasks of type %s: %s." % (
                                 task_name, rate_limit))
                                 task_name, rate_limit))
 
 
+    @expose
+    def shutdown(self, **kwargs):
+        self.logger.critical("Got shutdown from remote.")
+        raise SystemExit
+
 
 
 class ControlDispatch(object):
 class ControlDispatch(object):
     """Execute worker control panel commands."""
     """Execute worker control panel commands."""
@@ -76,7 +81,7 @@ class ControlDispatch(object):
         message = dict(message) # don't modify callers message.
         message = dict(message) # don't modify callers message.
         command = message.pop("command")
         command = message.pop("command")
         destination = message.pop("destination", None)
         destination = message.pop("destination", None)
-        if not destination or destination == self.hostname:
+        if not destination or self.hostname in destination:
             return self.execute(command, message)
             return self.execute(command, message)
 
 
     def execute(self, command, kwargs):
     def execute(self, command, kwargs):