Explorar o código

Use the same channel for consuming remote control commands and sending replies

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
52c1c87834
Modificáronse 2 ficheiros con 6 adicións e 2 borrados
  1. 1 0
      celery/worker/consumer.py
  2. 5 2
      celery/worker/control/__init__.py

+ 1 - 0
celery/worker/consumer.py

@@ -403,6 +403,7 @@ class Consumer(object):
 
         self.broadcast_consumer = mailbox(self.connection).Node(self.hostname)
         self.broadcast_consumer.register_callback(self.receive_message)
+        self.control_dispatch.channel = self.broadcast_consumer.channel
 
         # Flush events sent while connection was down.
         if self.event_dispatcher:

+ 5 - 2
celery/worker/control/__init__.py

@@ -12,18 +12,21 @@ class ControlDispatch(object):
     """Execute worker control panel commands."""
     Panel = Panel
 
-    def __init__(self, logger=None, hostname=None, consumer=None, app=None):
+    def __init__(self, logger=None, hostname=None, consumer=None, app=None,
+            channel=None):
         self.app = app_or_default(app)
         self.logger = logger or self.app.log.get_default_logger()
         self.hostname = hostname or socket.gethostname()
         self.consumer = consumer
+        self.channel = channel
         self.panel = self.Panel(self.logger, self.consumer, self.hostname,
                                 app=self.app)
 
     def reply(self, data, exchange, routing_key, **kwargs):
 
         def _do_reply(connection=None, connect_timeout=None):
-            mailbox(connection).publish_reply(data, exchange, routing_key)
+            mailbox(connection).publish_reply(data, exchange, routing_key,
+                                              channel=self.channel)
 
         self.app.with_default_connection(_do_reply)(**kwargs)