Explorar o código

New remote control commands: add_consumer + cancel_consumer* add_consumer queue exchange exchange_type routing_key **options Tells the worker to declare and consume from the specified declaration.* cancel_consumer queue_name Tells the worker to stop consuming from queue (by queue name).Commands also added to celeryctl and celery.task.control.inspect

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
816afda57f
Modificáronse 3 ficheiros con 35 adicións e 2 borrados
  1. 4 2
      celery/bin/celeryctl.py
  2. 9 0
      celery/task/control.py
  3. 22 0
      celery/worker/control/builtins.py

+ 4 - 2
celery/bin/celeryctl.py

@@ -185,7 +185,9 @@ class inspect(Command):
                "registered_tasks": 1.0,
                "enable_events": 1.0,
                "disable_events": 1.0,
-               "ping": 0.2}
+               "ping": 0.2,
+               "add_consumer": 1.0,
+               "cancel_consumer": 1.0,}
     option_list = Command.option_list + (
                 Option("--timeout", "-t", type="float", dest="timeout",
                     default=None,
@@ -224,7 +226,7 @@ class inspect(Command):
         i = inspect(destination=destination,
                     timeout=timeout,
                     callback=on_reply)
-        replies = getattr(i, command)()
+        replies = getattr(i, command)(*args[1:])
         if not replies:
             raise Error("No nodes replied within time constraint.")
         return replies

+ 9 - 0
celery/task/control.py

@@ -144,6 +144,15 @@ class inspect(object):
     def ping(self):
         return self._request("ping")
 
+    def add_consumer(self, queue, exchange=None, exchange_type="direct",
+            routing_key=None, **options):
+        return self._request("add_consumer", queue=queue, exchange=exchange,
+                             exchange_type=exchange_type,
+                             routing_key=routing_key, **options)
+
+    def cancel_consumer(self, queue, **kwargs):
+        return self._request("cancel_consumer", queue=queue, **kwargs)
+
 
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,

+ 22 - 0
celery/worker/control/builtins.py

@@ -184,3 +184,25 @@ def ping(panel, **kwargs):
 def shutdown(panel, **kwargs):
     panel.logger.critical("Got shutdown from remote.")
     raise SystemExit("Got shutdown from remote")
+
+
+@Panel.register
+def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
+        routing_key=None, **options):
+    cset = panel.listener.task_consumer
+    declaration = dict(queue=queue,
+                       exchange=exchange,
+                       exchange_type=exchange_type,
+                       routing_key=routing_key,
+                       **options)
+    cset.add_consumer_from_dict(**declaration)
+    cset.consume()
+    panel.logger.info("Started consuming from %r" % (declaration, ))
+    return {"ok": "started consuming from %s" % (queue, )}
+
+
+@Panel.register
+def cancel_consumer(panel, queue=None, **_):
+    cset = panel.listener.task_consumer
+    cset.cancel_by_queue(queue)
+    return {"ok": "no longer consuming from %s" % (queue, )}