Parcourir la source

Users can now add their own remote control commands.

celery.worker.control has been refactored to use a global command
registry, and support for replies has been added.
Ask Solem il y a 15 ans
Parent
commit
2b37d5a878

+ 0 - 149
celery/worker/control.py

@@ -1,149 +0,0 @@
-import socket
-from datetime import datetime
-
-from celery import log
-from celery.registry import tasks
-from celery.worker.revoke import revoked
-
-TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
-
-
-def expose(fun):
-    """Expose method as a celery worker control command, allowed to be called
-    from a message."""
-    fun.exposed = True
-    return fun
-
-
-class Control(object):
-    """The worker control panel.
-
-    :param logger: The current logger to use.
-
-    """
-
-    def __init__(self, logger, hostname=None, listener=None):
-        assert listener is not None
-        self.logger = logger
-        self.hostname = hostname or socket.gethostname()
-        self.listener = listener
-
-    @expose
-    def revoke(self, task_id, **kwargs):
-        """Revoke task by task id."""
-        revoked.add(task_id)
-        self.logger.warn("Task %s revoked." % task_id)
-
-    @expose
-    def rate_limit(self, task_name, rate_limit, **kwargs):
-        """Set new rate limit for a task type.
-
-        See :attr:`celery.task.base.Task.rate_limit`.
-
-        :param task_name: Type of task.
-        :param rate_limit: New rate limit.
-
-        """
-        try:
-            tasks[task_name].rate_limit = rate_limit
-        except KeyError:
-            return
-
-        self.listener.ready_queue.refresh()
-
-        if not rate_limit:
-            self.logger.warn("Disabled rate limits for tasks of type %s" % (
-                                task_name))
-        else:
-            self.logger.warn("New rate limit for tasks of type %s: %s." % (
-                                task_name, rate_limit))
-
-    @expose
-    def shutdown(self, **kwargs):
-        self.logger.critical("Got shutdown from remote.")
-        raise SystemExit
-
-    @expose
-    def dump_schedule(self, **kwargs):
-        schedule = self.listener.eta_schedule
-        info = "--Empty Schedule--"
-        if schedule.queue:
-            formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
-                    datetime.fromtimestamp(item["eta"]),
-                    item["priority"],
-                    item["item"])
-            info = "\n".join(map(formatitem, enumerate(schedule.info())))
-        self.logger.warn("* Dump of current schedule:\n%s" % (info, ))
-
-    @expose
-    def dump_tasks(self, **kwargs):
-        from celery import registry
-
-        def _extract_info(task):
-            fields = dict((field, str(getattr(task, field, None)))
-                            for field in TASK_INFO_FIELDS
-                                if getattr(task, field, None) is not None)
-            info = map("=".join, fields.items())
-            if not info:
-                return "\t%s" % task.name
-            return "\t%s [%s]" % (task.name, " ".join(info))
-
-        tasks = sorted(registry.tasks.keys())
-        tasks = [registry.tasks[task] for task in tasks]
-
-        self.logger.warn("* Dump of currently registered tasks:\n%s" % (
-            "\n".join(map(_extract_info, tasks))))
-
-
-class ControlDispatch(object):
-    """Execute worker control panel commands."""
-
-    panel_cls = Control
-
-    def __init__(self, logger=None, hostname=None, listener=None):
-        self.logger = logger or log.get_default_logger()
-        self.hostname = hostname
-        self.listener = listener
-        self.panel = self.panel_cls(self.logger,
-                                    hostname=self.hostname,
-                                    listener=self.listener)
-
-    def dispatch_from_message(self, message):
-        """Dispatch by using message data received by the broker.
-
-        Example:
-
-            >>> def receive_message(message_data, message):
-            ...     control = message_data.get("control")
-            ...     if control:
-            ...         ControlDispatch().dispatch_from_message(control)
-
-        """
-        message = dict(message) # don't modify callers message.
-        command = message.pop("command")
-        destination = message.pop("destination", None)
-        if not destination or self.hostname in destination:
-            return self.execute(command, message)
-
-    def execute(self, command, kwargs=None):
-        """Execute control command by name and keyword arguments.
-
-        :param command: Name of the command to execute.
-        :param kwargs: Keyword arguments.
-
-        """
-        kwargs = kwargs or {}
-        control = None
-        try:
-            control = getattr(self.panel, command)
-        except AttributeError:
-            pass
-        if control is None or not control.exposed:
-            self.logger.error("No such control command: %s" % command)
-        else:
-            # need to make sure keyword arguments are not in unicode
-            # this should be fixed in newer Python's
-            # (see: http://bugs.python.org/issue4978)
-            kwargs = dict((k.encode('utf8'), v)
-                            for (k, v) in kwargs.iteritems())
-            return control(**kwargs)

+ 68 - 0
celery/worker/control/__init__.py

@@ -0,0 +1,68 @@
+from celery import log
+from celery.worker.control.registry import Panel
+from celery.worker.control import builtins
+from celery.messaging import ControlReplyPublisher, with_connection
+
+
+class ControlDispatch(object):
+    """Execute worker control panel commands."""
+    panel_cls = Panel
+
+    def __init__(self, logger=None, hostname=None, listener=None):
+        self.logger = logger or log.get_default_logger()
+        self.hostname = hostname
+        self.listener = listener
+        self.panel = self.panel_cls(self.logger, self.listener, self.hostname)
+
+    @with_connection
+    def reply(self, data, exchange, routing_key, connection=None,
+            connect_timeout=None):
+        crq = ControlReplyPublisher(connection, exchange=exchange)
+        try:
+            crq.send(data, routing_key=routing_key)
+        finally:
+            crq.close()
+
+    def dispatch_from_message(self, message):
+        """Dispatch by using message data received by the broker.
+
+        Example:
+
+            >>> def receive_message(message_data, message):
+            ...     control = message_data.get("control")
+            ...     if control:
+            ...         ControlDispatch().dispatch_from_message(control)
+
+        """
+        message = dict(message) # don't modify callers message.
+        command = message.pop("command")
+        destination = message.pop("destination", None)
+        reply_to = message.pop("reply_to", None)
+        if not destination or self.hostname in destination:
+            return self.execute(command, message, reply_to=reply_to)
+
+    def execute(self, command, kwargs=None, reply_to=None):
+        """Execute control command by name and keyword arguments.
+
+        :param command: Name of the command to execute.
+        :param kwargs: Keyword arguments.
+
+        """
+        kwargs = kwargs or {}
+        control = None
+        try:
+            control = self.panel[command]
+        except KeyError:
+            self.logger.error("No such control command: %s" % command)
+        else:
+            # need to make sure keyword arguments are not in unicode
+            # this should be fixed in newer Python's
+            # (see: http://bugs.python.org/issue4978)
+            kwargs = dict((k.encode("utf8"), v)
+                            for k, v in kwargs.iteritems())
+            reply = control(self.panel, **kwargs)
+            if reply_to:
+                self.reply({self.hostname: reply},
+                           exchange=reply_to["exchange"],
+                           routing_key=reply_to["routing_key"])
+            return reply

+ 92 - 0
celery/worker/control/builtins.py

@@ -0,0 +1,92 @@
+from datetime import datetime
+
+from celery.registry import tasks
+from celery.worker.revoke import revoked
+from celery.worker.control.registry import Panel
+
+TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
+
+
+@Panel.register
+def revoke(panel, task_id, **kwargs):
+    """Revoke task by task id."""
+    revoked.add(task_id)
+    panel.logger.warn("Task %s revoked" % (task_id, ))
+    return True
+
+
+@Panel.register
+def rate_limit(panel, task_name, rate_limit, **kwargs):
+    """Set new rate limit for a task type.
+
+    See :attr:`celery.task.base.Task.rate_limit`.
+
+    :param task_name: Type of task.
+    :param rate_limit: New rate limit.
+
+    """
+    try:
+        tasks[task_name].rate_limit = rate_limit
+    except KeyError:
+        panel.logger.error("Rate limit attempt for unknown task %s" % (
+            task_name, ))
+        return "unknown task"
+
+    panel.listener.ready_queue.refresh()
+
+    if not rate_limit:
+        panel.logger.warn("Disabled rate limits for tasks of type %s" % (
+                            task_name, ))
+    else:
+        panel.logger.warn("New rate limit for tasks of type %s: %s." % (
+                    task_name, rate_limit))
+    return True
+
+
+@Panel.register
+def dump_schedule(panel, **kwargs):
+    schedule = panel.listener.eta_schedule
+    info = ["--Empty Schedule--"]
+    if not schedule.queue:
+        panel.logger.info("--Empty schedule--")
+        return []
+
+    formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
+            datetime.fromtimestamp(item["eta"]),
+            item["priority"],
+            item["item"])
+    info = map(formatitem, enumerate(schedule.info()))
+    panel.logger.info("* Dump of current schedule:\n%s" % (
+                            "\n".join(info, )))
+    return info
+
+
+@Panel.register
+def dump_tasks(panel, **kwargs):
+
+    def _extract_info(task):
+        fields = dict((field, str(getattr(task, field, None)))
+                        for field in TASK_INFO_FIELDS
+                            if getattr(task, field, None) is not None)
+        info = map("=".join, fields.items())
+        if not info:
+            return task.name
+        return "%s [%s]" % (task.name, " ".join(info))
+
+    info = map(_extract_info, (tasks[task]
+                                        for task in sorted(tasks.keys())))
+    panel.logger.warn("* Dump of currently registered tasks:\n%s" % (
+                "\n".join(info)))
+
+    return info
+
+
+@Panel.register
+def ping(panel, **kwargs):
+    return "pong"
+
+
+@Panel.register
+def shutdown(panel, **kwargs):
+    panel.logger.critical("Got shutdown from remote.")
+    raise SystemExit

+ 21 - 0
celery/worker/control/registry.py

@@ -0,0 +1,21 @@
+from UserDict import UserDict
+
+
+class Panel(UserDict):
+    data = dict() # Global registry.
+
+    def __init__(self, logger, listener, hostname=None):
+        self.logger = logger
+        self.hostname = hostname
+        self.listener = listener
+
+    @classmethod
+    def register(cls, method, name=None):
+        cls.data[name or method.__name__] = method
+
+    @classmethod
+    def unregister(cls, name_or_method):
+        name = name_or_method
+        if not isinstance(name_or_method, basestring):
+            name = name_or_method.__name__
+        cls.data.pop(name)