Parcourir la source

celery.messaging: Added cosumer/publisher for control command reply queue.

Ask Solem il y a 15 ans
Parent
commit
e7ca126fb3
1 fichiers modifiés avec 50 ajouts et 1 suppressions
  1. 50 1
      celery/messaging.py

+ 50 - 1
celery/messaging.py

@@ -5,6 +5,7 @@ Sending and Receiving Messages
 """
 import socket
 from datetime import datetime, timedelta
+from itertools import count
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
@@ -135,15 +136,55 @@ class EventConsumer(Consumer):
     no_ack = True
 
 
+class ControlReplyConsumer(Consumer):
+    exchange = "celerycrq"
+    exchange_type = "direct"
+    no_ack = True
+
+    def __init__(self, connection, ticket, **kwargs):
+        self.ticket = ticket
+        queue = "%s.%s" % (self.exchange, ticket)
+        super(ControlReplyConsumer, self).__init__(connection,
+                                                   queue=queue,
+                                                   routing_key=ticket,
+                                                   **kwargs)
+
+    def collect(self, limit=None, timeout=1):
+        responses = []
+
+        def callback(message_data, message):
+            responses.append(message_data)
+
+        self.callbacks = [callback]
+        it = self.iterconsume()
+        for i in limit and range(limit) or count():
+            try:
+                self.connection.drain_events(timeout=timeout)
+            except socket.timeout:
+                break
+        return responses
+
+
+class ControlReplyPublisher(Publisher):
+    exchange = "celerycrq"
+    exchange_type = "direct"
+
+
 class BroadcastPublisher(Publisher):
     """Publish broadcast commands"""
+
+    ReplyTo = ControlReplyConsumer
+
     exchange = conf.BROADCAST_EXCHANGE
     exchange_type = conf.BROADCAST_EXCHANGE_TYPE
 
-    def send(self, type, arguments, destination=None):
+    def send(self, type, arguments, destination=None, reply_ticket=None):
         """Send broadcast command."""
         arguments["command"] = type
         arguments["destination"] = destination
+        if reply_ticket:
+            arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
+                                     "routing_key": reply_ticket}
         super(BroadcastPublisher, self).send({"control": arguments})
 
 
@@ -202,3 +243,11 @@ def get_consumer_set(connection, queues=None, **options):
                             backend=cset.backend, **queue_options)
         cset.consumers.append(consumer)
     return cset
+
+
+@with_connection
+def reply(data, exchange, routing_key, connection=None, connect_timeout=None,
+        **kwargs):
+    pub = Publisher(connection, exchange=exchange,
+                    routing_key=routing_key, **kwargs)
+    pub.send(data)