Browse Source

Emit warning if a node with the same name is already receiving from the process mailbox.

Ask Solem 14 years ago
parent
commit
2584e999d5
1 changed files with 24 additions and 2 deletions
  1. 24 2
      celery/messaging.py

+ 24 - 2
celery/messaging.py

@@ -4,6 +4,8 @@ Sending and Receiving Messages
 
 """
 import socket
+import warnings
+
 from datetime import datetime, timedelta
 from itertools import count
 
@@ -215,10 +217,30 @@ class BroadcastConsumer(Consumer):
     no_ack = True
 
     def __init__(self, *args, **kwargs):
-        hostname = kwargs.pop("hostname", None) or socket.gethostname()
-        self.queue = "%s_%s" % (self.queue, hostname)
+        self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
+        self.queue = "%s_%s" % (self.queue, self.hostname)
         super(BroadcastConsumer, self).__init__(*args, **kwargs)
 
+    def verify_exclusive(self):
+        # XXX Kombu material
+        channel = getattr(self.backend, "channel")
+        if channel and hasattr(channel, "queue_declare"):
+            try:
+                _, _, consumers = channel.queue_declare(self.queue,
+                                                        passive=True)
+            except ValueError:
+                pass
+            else:
+                if consumers:
+                    warnings.warn(UserWarning(
+                        "A node named %s is already using this process "
+                        "mailbox. Maybe you should specify a custom name "
+                        "for this node with the -n argument?" % self.hostname))
+
+    def consume(self, *args, **kwargs):
+        self.verify_exclusive()
+        return super(BroadcastConsumer, self).consume(*args, **kwargs)
+
 
 def establish_connection(hostname=None, userid=None, password=None,
         virtual_host=None, port=None, ssl=None, insist=None,