Parcourir la source

celery.task.control: Broadcast now supports receiving replies.

Ask Solem il y a 15 ans
Parent
commit
7a1f81b7e7
1 fichiers modifiés avec 31 ajouts et 9 suppressions
  1. 31 9
      celery/task/control.py

+ 31 - 9
celery/task/control.py

@@ -1,5 +1,6 @@
 from celery import conf
-from celery.messaging import BroadcastPublisher
+from celery.utils import gen_unique_id
+from celery.messaging import BroadcastPublisher, ControlReplyConsumer
 from celery.messaging import with_connection, get_consumer_set
 
 
@@ -21,8 +22,7 @@ def discard_all(connection=None,
         consumers.close()
 
 
-def revoke(task_id, destination=None, connection=None,
-        connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+def revoke(task_id, destination=None, **kwargs):
     """Revoke a task by id.
 
     If a task is revoked, the workers will ignore the task and not execute
@@ -35,14 +35,18 @@ def revoke(task_id, destination=None, connection=None,
         a connection will be established automatically.
     :keyword connect_timeout: Timeout for new connection if a custom
         connection is not provided.
+    :keyword reply: Wait for and return the reply.
+    :keyword timeout: Timeout in seconds to wait for the reply.
+    :keyword limit: Limit number of replies.
 
     """
     return broadcast("revoke", destination=destination,
-                               arguments={"task_id": task_id})
+                               arguments={"task_id": task_id}, **kwargs)
 
 
-def rate_limit(task_name, rate_limit, destination=None, connection=None,
-        connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+
+
+def rate_limit(task_name, rate_limit, destination=None, **kwargs):
     """Set rate limit for task by type.
 
     :param task_name: Type of task to change rate limit for.
@@ -55,16 +59,21 @@ def rate_limit(task_name, rate_limit, destination=None, connection=None,
         a connection will be established automatically.
     :keyword connect_timeout: Timeout for new connection if a custom
         connection is not provided.
+    :keyword reply: Wait for and return the reply.
+    :keyword timeout: Timeout in seconds to wait for the reply.
+    :keyword limit: Limit number of replies.
 
     """
     return broadcast("rate_limit", destination=destination,
                                    arguments={"task_name": task_name,
-                                              "rate_limit": rate_limit})
+                                              "rate_limit": rate_limit},
+                                   **kwargs)
 
 
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,
-        connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+        connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,
+        timeout=1, limit=None):
     """Broadcast a control command to the celery workers.
 
     :param command: Name of command to send.
@@ -75,12 +84,25 @@ def broadcast(command, arguments=None, destination=None, connection=None,
         a connection will be established automatically.
     :keyword connect_timeout: Timeout for new connection if a custom
         connection is not provided.
+    :keyword reply: Wait for and return the reply.
+    :keyword timeout: Timeout in seconds to wait for the reply.
+    :keyword limit: Limit number of replies.
 
     """
     arguments = arguments or {}
+    reply_ticket = reply and gen_unique_id() or None
+
 
     broadcast = BroadcastPublisher(connection)
     try:
-        broadcast.send(command, arguments, destination=destination)
+        broadcast.send(command, arguments, destination=destination,
+                       reply_ticket=reply_ticket)
     finally:
         broadcast.close()
+
+    if reply_ticket:
+        crq = ControlReplyConsumer(connection, reply_ticket)
+        try:
+            return crq.collect(limit=limit, timeout=timeout)
+        finally:
+            crq.close()