Browse Source

control.broadcast: Added callback argument to broadcast(), to process replies immediately as they arrive.

Ask Solem 14 years ago
parent
commit
a16e515842
2 changed files with 13 additions and 6 deletions
  1. 5 3
      celery/messaging.py
  2. 8 3
      celery/task/control.py

+ 5 - 3
celery/messaging.py

@@ -162,13 +162,15 @@ class ControlReplyConsumer(Consumer):
                                                    routing_key=ticket,
                                                    **kwargs)
 
-    def collect(self, limit=None, timeout=1):
+    def collect(self, limit=None, timeout=1, callback=None):
         responses = []
 
-        def callback(message_data, message):
+        def on_message(message_data, message):
+            if callback:
+                callback(message_data)
             responses.append(message_data)
 
-        self.callbacks = [callback]
+        self.callbacks = [on_message]
         self.consume()
         for i in limit and range(limit) or count():
             try:

+ 8 - 3
celery/task/control.py

@@ -97,9 +97,10 @@ def flatten_reply(reply):
 
 class inspect(object):
 
-    def __init__(self, destination=None, timeout=1):
+    def __init__(self, destination=None, timeout=1, callback=None):
         self.destination = destination
         self.timeout = timeout
+        self.callback = callback
 
     def _prepare(self, reply):
         if not reply:
@@ -113,6 +114,7 @@ class inspect(object):
     def _request(self, command, **kwargs):
         return self._prepare(broadcast(command, arguments=kwargs,
                                       destination=self.destination,
+                                      callback=self.callback,
                                       timeout=self.timeout, reply=True))
 
     def active(self, safe=False):
@@ -150,7 +152,7 @@ class inspect(object):
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,
         connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,
-        timeout=1, limit=None):
+        timeout=1, limit=None, callback=None):
     """Broadcast a control command to the celery workers.
 
     :param command: Name of command to send.
@@ -164,6 +166,8 @@ def broadcast(command, arguments=None, destination=None, connection=None,
     :keyword reply: Wait for and return the reply.
     :keyword timeout: Timeout in seconds to wait for the reply.
     :keyword limit: Limit number of replies.
+    :keyword callback: Callback called immediately for each reply
+        received.
 
     """
     arguments = arguments or {}
@@ -187,6 +191,7 @@ def broadcast(command, arguments=None, destination=None, connection=None,
     if reply_ticket:
         crq = ControlReplyConsumer(connection, reply_ticket)
         try:
-            return crq.collect(limit=limit, timeout=timeout)
+            return crq.collect(limit=limit, timeout=timeout,
+                               callback=callback)
         finally:
             crq.close()