Selaa lähdekoodia

Improve Celery command, adding autoscale/grow/shink/time_limit++

Ask Solem 13 vuotta sitten
vanhempi
commit
5822cc11e8
6 muutettua tiedostoa jossa 273 lisäystä ja 102 poistoa
  1. 6 0
      celery/app/amqp.py
  2. 75 43
      celery/app/control.py
  3. 20 1
      celery/bin/base.py
  4. 159 48
      celery/bin/celery.py
  5. 11 8
      celery/worker/control.py
  6. 2 2
      docs/userguide/workers.rst

+ 6 - 0
celery/app/amqp.py

@@ -121,6 +121,12 @@ class Queues(dict):
         if wanted:
             self._consume_from = dict((name, self[name]) for name in wanted)
 
+    def select_remove(self, queue):
+        if self._consume_from is None:
+            self.select_subset(k for k in self.keys() if k != queue)
+        else:
+            self._consume_from.pop(queue, None)
+
     def new_missing(self, name):
         return Queue(name, Exchange(name), name)
 

+ 75 - 43
celery/app/control.py

@@ -76,24 +76,9 @@ class Inspect(object):
         return self._request("dump_tasks")
     registered_tasks = registered
 
-    def enable_events(self):
-        return self._request("enable_events")
-
-    def disable_events(self):
-        return self._request("disable_events")
-
     def ping(self):
         return self._request("ping")
 
-    def add_consumer(self, queue, exchange=None, exchange_type="direct",
-            routing_key=None, **options):
-        return self._request("add_consumer", queue=queue, exchange=exchange,
-                             exchange_type=exchange_type,
-                             routing_key=routing_key, **options)
-
-    def cancel_consumer(self, queue, **kwargs):
-        return self._request("cancel_consumer", queue=queue, **kwargs)
-
     def active_queues(self):
         return self._request("active_queues")
 
@@ -124,7 +109,7 @@ class Control(object):
 
     def revoke(self, task_id, destination=None, terminate=False,
             signal="SIGTERM", **kwargs):
-        """Revoke a task by id.
+        """Tell all (or specific) workers to revoke a task by id.
 
         If a task is revoked, the workers will ignore the task and
         not execute it after all.
@@ -134,13 +119,8 @@ class Control(object):
             on the task (if any).
         :keyword signal: Name of signal to send to process if terminate.
             Default is TERM.
-        :keyword destination: If set, a list of the hosts to send the
-            command to, when empty broadcast to all workers.
-        :keyword connection: Custom broker connection to use, if not set,
-            a connection will be established automatically.
-        :keyword reply: Wait for and return the reply.
-        :keyword timeout: Timeout in seconds to wait for the reply.
-        :keyword limit: Limit number of replies.
+
+        See :meth:`broadcast` for supported keyword arguments.
 
         """
         return self.broadcast("revoke", destination=destination,
@@ -149,37 +129,27 @@ class Control(object):
                                          "signal": signal}, **kwargs)
 
     def ping(self, destination=None, timeout=1, **kwargs):
-        """Ping workers.
+        """Ping all (or specific) workers.
 
         Returns answer from alive workers.
 
-        :keyword destination: If set, a list of the hosts to send the
-            command to, when empty broadcast to all workers.
-        :keyword connection: Custom broker connection to use, if not set,
-            a connection will be established automatically.
-        :keyword reply: Wait for and return the reply.
-        :keyword timeout: Timeout in seconds to wait for the reply.
-        :keyword limit: Limit number of replies.
+        See :meth:`broadcast` for supported keyword arguments.
 
         """
         return self.broadcast("ping", reply=True, destination=destination,
                               timeout=timeout, **kwargs)
 
     def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
-        """Set rate limit for task by type.
+        """Tell all (or specific) workers to set a new rate limit
+        for task by type.
 
         :param task_name: Name of task to change rate limit for.
         :param rate_limit: The rate limit as tasks per second, or a rate limit
             string (`"100/m"`, etc.
             see :attr:`celery.task.base.Task.rate_limit` for
             more information).
-        :keyword destination: If set, a list of the hosts to send the
-            command to, when empty broadcast to all workers.
-        :keyword connection: Custom broker connection to use, if not set,
-            a connection will be established automatically.
-        :keyword reply: Wait for and return the reply.
-        :keyword timeout: Timeout in seconds to wait for the reply.
-        :keyword limit: Limit number of replies.
+
+        See :meth:`broadcast` for supported keyword arguments.
 
         """
         return self.broadcast("rate_limit", destination=destination,
@@ -187,8 +157,46 @@ class Control(object):
                                          "rate_limit": rate_limit},
                               **kwargs)
 
+    def add_consumer(self, queue, exchange=None, exchange_type="direct",
+            routing_key=None, **options):
+        """Tell all (or specific) workers to start consuming from a new queue.
+
+        Only the queue name is required as if only the queue is specified
+        then the exchange/routing key will be set to the same name (
+        like automatic queues do).
+
+        .. note::
+
+            This command does not respect the default queue/exchange
+            options in the configuration.
+
+        :param queue: Name of queue to start consuming from.
+        :keyword exchange: Optional name of exchange.
+        :keyword exchange_type: Type of exchange (defaults to "direct")
+            command to, when empty broadcast to all workers.
+        :keyword routing_key: Optional routing key.
+
+        See :meth:`broadcast` for supported keyword arguments.
+
+        """
+        return self.broadcast("add_consumer",
+                arguments={"queue": queue, "exchange": exchange,
+                           "exchange_type": exchange_type,
+                           "routing_key": routing_key}, **options)
+
+    def cancel_consumer(self, queue, **kwargs):
+        """Tell all (or specific) workers to stop consuming from ``queue``.
+
+        Supports the same keyword arguments as :meth:`broadcast`.
+
+        """
+        return self.broadcast("cancel_consumer",
+                arguments={"queue": queue}, **kwargs)
+
+
     def time_limit(self, task_name, soft=None, hard=None, **kwargs):
-        """Set time limits for task by type.
+        """Tell all (or specific) workers to set time limits for
+        a task by type.
 
         :param task_name: Name of task to change time limits for.
         :keyword soft: New soft time limit (in seconds).
@@ -199,12 +207,35 @@ class Control(object):
         """
         return self.broadcast("time_limit",
                               arguments={"task_name": task_name,
-                                         "hard": hard, "soft": soft},
-                              **kwargs)
+                                         "hard": hard, "soft": soft}, **kwargs)
+
+    def enable_events(self, destination=None, **kwargs):
+        """Tell all (or specific) workers to enable events."""
+        return self.broadcast("enable_events", {}, destination, **kwargs)
+
+    def disable_events(self, destination=None, **kwargs):
+        """Tell all (or specific) workers to enable events."""
+        return self.broadcast("disable_events", {}, destination, **kwargs)
+
+    def pool_grow(self, n=1, destination=None, **kwargs):
+        """Tell all (or specific) workers to grow the pool by ``n``.
+
+        Supports the same arguments as :meth:`broadcast`.
+
+        """
+        return self.broadcast("pool_grow", {}, destination, **kwargs)
+
+    def pool_shrink(self, n=1, destination=None, **kwargs):
+        """Tell all (or specific) workers to shrink the pool by ``n``.
+
+        Supports the same arguments as :meth:`broadcast`.
+
+        """
+        return self.broadcast("pool_shrink", {}, destination, **kwargs)
 
     def broadcast(self, command, arguments=None, destination=None,
             connection=None, reply=False, timeout=1, limit=None,
-            callback=None, channel=None):
+            callback=None, channel=None, **extra_kwargs):
         """Broadcast a control command to the celery workers.
 
         :param command: Name of command to send.
@@ -223,6 +254,7 @@ class Control(object):
         with self.app.default_connection(connection) as conn:
             if channel is None:
                 channel = conn.default_channel
+            arguments = dict(arguments, **extra_kwargs)
             return self.mailbox(conn)._broadcast(command, arguments,
                                                  destination, reply, timeout,
                                                  limit, callback,

+ 20 - 1
celery/bin/base.py

@@ -89,6 +89,17 @@ find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
 find_rst_ref = re.compile(r':\w+:`(.+?)`')
 
 
+class Parser(OptionParser):
+
+    def format_epilog(self, *args, **kwargs):
+        return ""
+
+    def print_help(self, *args, **kwargs):
+        OptionParser.print_help(self, *args, **kwargs)
+        if self.epilog:
+            print("\n" + self.epilog)
+
+
 class Command(object):
     """Base class for command line applications.
 
@@ -96,6 +107,8 @@ class Command(object):
     :keyword get_app: Callable returning the current app if no app provided.
 
     """
+    Parser = Parser
+
     #: Arg list used in help.
     args = ''
 
@@ -126,7 +139,11 @@ class Command(object):
     #: Default configuration namespace.
     namespace = "celery"
 
-    Parser = OptionParser
+    #: Text to print at end of --help
+    epilog = None
+
+    #: Text to print in --help before option list.
+    description = ''
 
     def __init__(self, app=None, get_app=None):
         self.app = app
@@ -207,6 +224,8 @@ class Command(object):
         return self.prepare_parser(self.Parser(prog=prog_name,
                            usage=self.usage(command),
                            version=self.version,
+                           epilog=self.epilog,
+                           description=self.description,
                            option_list=(self.preload_options +
                                         self.get_options())))
 

+ 159 - 48
celery/bin/celery.py

@@ -22,12 +22,28 @@ HELP = """
 Type '%(prog_name)s <command> --help' for help using
 a specific command.
 
-Available commands:
+--- Commands ---
+
 %(commands)s
 """
 
 commands = {}
 
+command_classes = (
+    ("Main",
+        ["worker", "events", "beat", "shell", "amqp", "help"],
+        "green",
+    ),
+    ("Remote Control",
+        ["status", "inspect", "control"],
+        "blue",
+    ),
+    ("Utils",
+        ["purge", "list", "migrate", "apply", "result", "report"],
+        None,
+    ),
+)
+
 
 class Error(Exception):
 
@@ -52,6 +68,7 @@ class Command(BaseCommand):
     version = __version__
     prog_name = "celery"
     show_body = True
+    leaf = True
 
     option_list = (
         Option("--quiet", "-q", action="store_true"),
@@ -304,61 +321,72 @@ class result(Command):
 result = command(result)
 
 
-class rate_limit(Command):
-    args = "<task_name> <rate_limit (e.g. 10/m for 10 in a second)>"
-
-    def run(self, task_name, limit, **kwargs):
-        self.say_chat("<-", "rate_limit")
-        replies = self.app.control.rate_limit(task_name, limit, reply=True)
-        for reply in replies:
-            self.say_remote_command_reply(reply)
-rate_limit = command(rate_limit)
-
-
-class inspect(Command):
-    choices = {"active": 1.0,
-               "active_queues": 1.0,
-               "scheduled": 1.0,
-               "reserved": 1.0,
-               "stats": 1.0,
-               "revoked": 1.0,
-               "registered_tasks": 1.0,  # alias to registered
-               "registered": 1.0,
-               "enable_events": 1.0,
-               "disable_events": 1.0,
-               "ping": 0.2,
-               "add_consumer": 1.0,
-               "cancel_consumer": 1.0,
-               "report": 1.0}
+class _RemoteControl(Command):
+    name = None
+    choices = None
+    leaf = False
     option_list = Command.option_list + (
                 Option("--timeout", "-t", type="float",
                     help="Timeout in seconds (float) waiting for reply"),
                 Option("--destination", "-d",
                     help="Comma separated list of destination node names."))
 
+    @classmethod
+    def get_command_info(self, command, color=None):
+        try:
+            # see if it uses args.
+            meth = getattr(self, command)
+            return "%s %s" % (color(command), meth.__doc__)
+        except AttributeError:
+            return str(color(command))
+
+    @classmethod
+    def list_commands(self, indent=0, prefix="", color=None):
+        color = color if color else lambda x: x
+        prefix = prefix + " " if prefix else ""
+        return "\n".join(text.indent(prefix
+                    + self.get_command_info(c, color), indent)
+                        for c in sorted(self.choices))
+
+    @property
+    def epilog(self):
+        return "\n".join([
+            "[Commands]",
+            self.list_commands()
+        ])
+
     def usage(self, command):
-        return "%%prog %s [options] %s [%s]" % (
-                command, self.args, "|".join(self.choices.keys()))
+        return "%%prog %s [options] %s <command> [arg1 .. argN]" % (
+                command, self.args)
+
+    def call(self, *args, **kwargs):
+        raise NotImplementedError("get_obj")
 
     def run(self, *args, **kwargs):
         if not args:
-            raise Error("Missing inspect command. See --help")
-        command = args[0]
-        if command == "help":
-            raise Error("Did you mean 'inspect --help'?")
-        if command not in self.choices:
-            raise Error("Unknown inspect command: %s" % command)
+            raise Error("Missing %s method. See --help" % self.name)
+        return self.do_call_method(args, **kwargs)
+
+    def do_call_method(self, args, **kwargs):
+        method = args[0]
+        if method == "help":
+            raise Error("Did you mean '%s --help'?" % self.name)
+        if method not in self.choices:
+            raise Error("Unknown %s method %s" % (self.name, method))
 
         destination = kwargs.get("destination")
-        timeout = kwargs.get("timeout") or self.choices[command]
+        timeout = kwargs.get("timeout") or self.choices[method]
         if destination and isinstance(destination, basestring):
             destination = map(str.strip, destination.split(","))
 
-        self.say_chat("<-", command)
-        i = self.app.control.inspect(destination=destination,
-                                     timeout=timeout,
-                                     callback=self.say_remote_command_reply)
-        replies = getattr(i, command)(*args[1:])
+        try:
+            handler = getattr(self, method)
+        except AttributeError:
+            handler = self.call
+
+        replies = handler(method, *args[1:],
+                          timeout=timeout, destination=destination,
+                          callback=self.say_remote_command_reply)
         if not replies:
             raise Error("No nodes replied within time constraint.",
                         status=EX_UNAVAILABLE)
@@ -372,9 +400,74 @@ class inspect(Command):
         self.out(c.reset(dirstr, title))
         if body and self.show_body:
             self.out(body)
+
+
+class inspect(_RemoteControl):
+    name = "inspect"
+    choices = {"active": 1.0,
+               "active_queues": 1.0,
+               "scheduled": 1.0,
+               "reserved": 1.0,
+               "stats": 1.0,
+               "revoked": 1.0,
+               "registered_tasks": 1.0,  # alias to registered
+               "registered": 1.0,
+               "ping": 0.2,
+               "report": 1.0}
+
+    def call(self, method, *args, **options):
+        i = self.app.control.inspect(**options)
+        return getattr(i, method)(*args)
 inspect = command(inspect)
 
 
+class control(_RemoteControl):
+    name = "control"
+    choices = {"enable_events": 1.0,
+               "disable_events": 1.0,
+               "add_consumer": 1.0,
+               "cancel_consumer": 1.0,
+               "rate_limit": 1.0,
+               "time_limit": 1.0,
+               "autoscale": 1.0,
+               "pool_grow": 1.0,
+               "pool_shrink": 1.0}
+
+    def call(self, method, *args, **options):
+        return getattr(self.app.control, method)(*args, reply=True, **options)
+
+    def pool_grow(self, method, n=1, **kwargs):
+        """[N=1]"""
+        return self.call(method, n, **kwargs)
+
+    def pool_shrink(self, method, n=1, **kwargs):
+        """[N=1]"""
+        return self.call(method, n, **kwargs)
+
+    def autoscale(self, method, max=None, min=None, **kwargs):
+        """[max] [min]"""
+        return self.call(method, max, min, **kwargs)
+
+    def rate_limit(self, method, task_name, rate_limit, **kwargs):
+        """<task_name> <rate_limit (e.g. 10/m for 10 in a second)>"""
+        return self.call(method, task_name, rate_limit, **kwargs)
+
+    def time_limit(self, method, task_name, soft, hard=None, **kwargs):
+        """<task_name> <soft_secs> [hard_secs]"""
+        return self.call(method, task_name, soft, hard, **kwargs)
+
+    def add_consumer(self, method, queue, exchange=None,
+            exchange_type="direct", routing_key=None, **kwargs):
+        """<queue> [exchange [exchange_type [routing_key]]]"""
+        return self.call(method, queue, exchange,
+                         exchange_type, routing_key, **kwargs)
+
+    def cancel_consumer(self, method, queue, **kwargs):
+        """<queue>"""
+        return self.call(method, queue, **kwargs)
+control = command(control)
+
+
 class status(Command):
     option_list = inspect.option_list
 
@@ -506,12 +599,6 @@ class shell(Command):  # pragma: no cover
 shell = command(shell)
 
 
-def commandlist(indent=0):
-    return (text.indent(command, indent)
-                for command in sorted(commands,
-                    key=lambda k: commands[k].sortpri or k))
-
-
 class help(Command):
 
     def usage(self, command):
@@ -520,7 +607,7 @@ class help(Command):
     def run(self, *args, **kwargs):
         self.parser.print_help()
         self.out(HELP % {"prog_name": self.prog_name,
-                         "commands": "\n".join(commandlist(ident=4))})
+                         "commands": CeleryCommand.list_commands()})
 
         return EX_USAGE
 help = command(help)
@@ -580,6 +667,30 @@ class CeleryCommand(BaseCommand):
         except KeyboardInterrupt:
             sys.exit(EX_FAILURE)
 
+    @classmethod
+    def get_command_info(self, command, indent=0, color=None):
+        colored = term.colored().names[color] if color else lambda x: x
+        obj = self.commands[command]
+        if obj.leaf:
+            return "celery %s" %  colored(command)
+        return obj.list_commands(indent, "celery %s" % command, colored)
+
+    @classmethod
+    def list_commands(self, indent=0):
+        white = term.colored().white
+        ret = []
+        for cls, commands, color in command_classes:
+            ret.extend([
+                text.indent("[%s]" % white(cls), indent),
+                "\n".join(text.indent(
+                    self.get_command_info(command, indent, color), indent + 4)
+                        for command in commands),
+                "",
+            ])
+        return "\n".join(ret)
+
+
+
 
 def determine_exit_status(ret):
     if isinstance(ret, int):

+ 11 - 8
celery/worker/control.py

@@ -264,18 +264,20 @@ def shutdown(panel, msg="Got shutdown from remote", **kwargs):
 
 
 @Panel.register
-def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
+def add_consumer(panel, queue, exchange=None, exchange_type=None,
         routing_key=None, **options):
     cset = panel.consumer.task_consumer
+    exchange = queue if exchange is None else exchange
+    routing_key = queue if routing_key is None else routing_key
+    exchange_type = "direct" if exchange_type is None else exchange_type
     if not cset.consuming_from(queue):
-        declaration = dict(queue=queue,
-                           exchange=exchange,
-                           exchange_type=exchange_type,
-                           routing_key=routing_key,
-                           **options)
-        cset.add_consumer_from_dict(**declaration)
+        q = panel.app.amqp.queues.add(queue,
+                exchange=exchange,
+                exchange_type=exchange_type,
+                routing_key=routing_key, **options)
+        cset.add_queue(q)
         cset.consume()
-        logger.info("Started consuming from %r", declaration)
+        logger.info("Started consuming from %r", queue)
         return {"ok": "started consuming from %r" % (queue, )}
     else:
         return {"ok": "already consuming from %r" % (queue, )}
@@ -283,6 +285,7 @@ def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
 
 @Panel.register
 def cancel_consumer(panel, queue=None, **_):
+    panel.app.amqp.queues.select_remove(queue)
     cset = panel.consumer.task_consumer
     cset.cancel_by_queue(queue)
     return {"ok": "no longer consuming from %s" % (queue, )}

+ 2 - 2
docs/userguide/workers.rst

@@ -485,8 +485,8 @@ a worker using :program:`celery events`/:program:`celerymon`.
 
 .. code-block:: python
 
-    >>> celery.control.broadcast("enable_events")
-    >>> celery.control.broadcast("disable_events")
+    >>> celery.control.enable_events()
+    >>> celery.control.disable_events()
 
 .. _worker-autoreload: