Browse Source

Pidbox fixes

Ask Solem 14 years ago
parent
commit
207ed49b7c
3 changed files with 31 additions and 10 deletions
  1. 27 5
      celery/pidbox.py
  2. 3 3
      celery/task/control.py
  3. 1 2
      celery/worker/listener.py

+ 27 - 5
celery/pidbox.py

@@ -19,11 +19,13 @@ class Mailbox(object):
         self.exchange = Exchange("%s.pidbox" % (self.namespace, ),
         self.exchange = Exchange("%s.pidbox" % (self.namespace, ),
                                  type="fanout",
                                  type="fanout",
                                  durable=False,
                                  durable=False,
-                                 auto_delete=True)
+                                 auto_delete=True,
+                                 delivery_mode="transient")
         self.reply_exchange = Exchange("reply.%s.pidbox" % (self.namespace, ),
         self.reply_exchange = Exchange("reply.%s.pidbox" % (self.namespace, ),
                                  type="direct",
                                  type="direct",
                                  durable=False,
                                  durable=False,
-                                 auto_delete=True)
+                                 auto_delete=True,
+                                 delivery_mode="transient")
 
 
     def publish_reply(self, reply, exchange, routing_key, channel=None):
     def publish_reply(self, reply, exchange, routing_key, channel=None):
         chan = channel or self.connection.channel()
         chan = channel or self.connection.channel()
@@ -80,18 +82,38 @@ class Mailbox(object):
             arguments["reply_to"] = {"exchange": self.reply_exchange.name,
             arguments["reply_to"] = {"exchange": self.reply_exchange.name,
                                      "routing_key": reply_ticket}
                                      "routing_key": reply_ticket}
         chan = channel or self.connection.channel()
         chan = channel or self.connection.channel()
-        producer = Producer(exchange=self.exchange, delivery_mode="transient")
+        producer = Producer(chan, exchange=self.exchange)
         try:
         try:
             producer.publish({"control": arguments})
             producer.publish({"control": arguments})
         finally:
         finally:
             channel or chan.close()
             channel or chan.close()
 
 
-    def get_consumer(self, hostname, channel=None):
+    def Node(self, hostname, channel=None):
         return Consumer(channel or self.connection.channel(),
         return Consumer(channel or self.connection.channel(),
                         [self.get_queue(hostname)],
                         [self.get_queue(hostname)],
                         no_ack=True)
                         no_ack=True)
 
 
-    def broadcast(self, command, arguments=None, destination=None,
+    def call(self, destination, command, kwargs={}, timeout=None,
+            callback=None, channel=None):
+        return self._broadcast(command, kwargs, destination,
+                               reply=True, timeout=timeout,
+                               callback=callback,
+                               channel=channel)
+
+    def cast(self, destination, command, kwargs={}):
+        return self._broadcast(command, kwargs, destination, reply=False)
+
+    def abcast(self, command, kwargs={}):
+        return self._broadcast(command, kwargs, reply=False)
+
+    def multi_call(self, command, kwargs={}, timeout=1,
+            limit=None, callback=None, channel=None):
+        return self._broadcast(command, kwargs, reply=True,
+                               timeout=timeout, limit=limit,
+                               callback=callback,
+                               channel=channel)
+
+    def _broadcast(self, command, arguments=None, destination=None,
             reply=False, timeout=1, limit=None, callback=None, channel=None):
             reply=False, timeout=1, limit=None, callback=None, channel=None):
         arguments = arguments or {}
         arguments = arguments or {}
         reply_ticket = reply and gen_unique_id() or None
         reply_ticket = reply and gen_unique_id() or None

+ 3 - 3
celery/task/control.py

@@ -185,9 +185,9 @@ class Control(object):
 
 
         """
         """
         def _do_broadcast(connection=None, connect_timeout=None):
         def _do_broadcast(connection=None, connect_timeout=None):
-            return mailbox(connection).broadcast(command, arguments,
-                                                 destination, reply,
-                                                 timeout, limit, callback)
+            return mailbox(connection)._broadcast(command, arguments,
+                                                  destination, reply,
+                                                  timeout, limit, callback)
 
 
         return self.app.with_default_connection(_do_broadcast)(
         return self.app.with_default_connection(_do_broadcast)(
                 connection=connection, connect_timeout=connect_timeout)
                 connection=connection, connect_timeout=connect_timeout)

+ 1 - 2
celery/worker/listener.py

@@ -401,8 +401,7 @@ class CarrotListener(object):
         self.task_consumer.on_decode_error = self.on_decode_error
         self.task_consumer.on_decode_error = self.on_decode_error
         self.task_consumer.register_callback(self.receive_message)
         self.task_consumer.register_callback(self.receive_message)
 
 
-        self.broadcast_consumer = mailbox(self.connection).get_consumer(
-                                        self.hostname)
+        self.broadcast_consumer = mailbox(self.connection).Node(self.hostname)
         self.broadcast_consumer.register_callback(self.receive_message)
         self.broadcast_consumer.register_callback(self.receive_message)
 
 
         # Flush events sent while connection was down.
         # Flush events sent while connection was down.