Browse Source

New remote control commands: add_consumer + cancel_consumer* add_consumer queue exchange exchange_type routing_key **options Tells the worker to declare and consume from the specified declaration.* cancel_consumer queue_name Tells the worker to stop consuming from queue (by queue name).Commands also added to celeryctl and celery.task.control.inspect

Ask Solem 14 years ago
parent
commit
2f71227e7a
3 changed files with 35 additions and 2 deletions
  1. 4 2
      celery/bin/celeryctl.py
  2. 9 0
      celery/task/control.py
  3. 22 0
      celery/worker/control/builtins.py

+ 4 - 2
celery/bin/celeryctl.py

@@ -175,7 +175,9 @@ class inspect(Command):
                "registered_tasks": 1.0,
                "enable_events": 1.0,
                "disable_events": 1.0,
-               "ping": 0.2}
+               "ping": 0.2,
+               "add_consumer": 1.0,
+               "cancel_consumer": 1.0,}
     option_list = Command.option_list + (
                 Option("--timeout", "-t", type="float", dest="timeout",
                     default=None,
@@ -213,7 +215,7 @@ class inspect(Command):
         i = self.app.control.inspect(destination=destination,
                                      timeout=timeout,
                                      callback=on_reply)
-        replies = getattr(i, command)()
+        replies = getattr(i, command)(*args[1:])
         if not replies:
             raise Error("No nodes replied within time constraint.")
         return replies

+ 9 - 0
celery/task/control.py

@@ -61,6 +61,15 @@ class Inspect(object):
     def ping(self):
         return self._request("ping")
 
+    def add_consumer(self, queue, exchange=None, exchange_type="direct",
+            routing_key=None, **options):
+        return self._request("add_consumer", queue=queue, exchange=exchange,
+                             exchange_type=exchange_type,
+                             routing_key=routing_key, **options)
+
+    def cancel_consumer(self, queue, **kwargs):
+        return self._request("cancel_consumer", queue=queue, **kwargs)
+
 
 class Control(object):
 

+ 22 - 0
celery/worker/control/builtins.py

@@ -182,3 +182,25 @@ def ping(panel, **kwargs):
 def shutdown(panel, **kwargs):
     panel.logger.critical("Got shutdown from remote.")
     raise SystemExit("Got shutdown from remote")
+
+
+@Panel.register
+def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
+        routing_key=None, **options):
+    cset = panel.listener.task_consumer
+    declaration = dict(queue=queue,
+                       exchange=exchange,
+                       exchange_type=exchange_type,
+                       routing_key=routing_key,
+                       **options)
+    cset.add_consumer_from_dict(**declaration)
+    cset.consume()
+    panel.logger.info("Started consuming from %r" % (declaration, ))
+    return {"ok": "started consuming from %s" % (queue, )}
+
+
+@Panel.register
+def cancel_consumer(panel, queue=None, **_):
+    cset = panel.listener.task_consumer
+    cset.cancel_by_queue(queue)
+    return {"ok": "no longer consuming from %s" % (queue, )}