Browse Source

Worker: Events must use connection_for_write (Issue #3525)

Ask Solem 8 years ago
parent
commit
6fd3bc0375
2 changed files with 23 additions and 6 deletions
  1. 12 2
      celery/worker/consumer/consumer.py
  2. 11 4
      celery/worker/consumer/events.py

+ 12 - 2
celery/worker/consumer/consumer.py

@@ -397,13 +397,22 @@ class Consumer(object):
             self.pool.flush()
             self.pool.flush()
 
 
     def connect(self):
     def connect(self):
-        """Establish the broker connection.
+        """Establish the broker connection used for consuming tasks.
 
 
         Retries establishing the connection if the
         Retries establishing the connection if the
         :setting:`broker_connection_retry` setting is enabled
         :setting:`broker_connection_retry` setting is enabled
         """
         """
-        conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)
+        return self.connection_for_read(heartbeat=self.amqheartbeat)
 
 
+    def connection_for_read(self, heartbeat=None):
+        return self.ensure_connected(
+            self.app.connection_for_read(heartbeat=heartbeat))
+
+    def connection_for_write(self, heartbeat=None):
+        return self.ensure_connected(
+            self.app.connection_for_write(heartbeat=heartbeat))
+
+    def ensure_connected(self, conn):
         # Callback called for each retry while the connection
         # Callback called for each retry while the connection
         # can't be established.
         # can't be established.
         def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
         def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
@@ -427,6 +436,7 @@ class Consumer(object):
             conn.transport.register_with_event_loop(conn.connection, self.hub)
             conn.transport.register_with_event_loop(conn.connection, self.hub)
         return conn
         return conn
 
 
+
     def _flush_events(self):
     def _flush_events(self):
         if self.event_dispatcher:
         if self.event_dispatcher:
             self.event_dispatcher.flush()
             self.event_dispatcher.flush()

+ 11 - 4
celery/worker/consumer/events.py

@@ -15,8 +15,11 @@ class Events(bootsteps.StartStopStep):
 
 
     requires = (Connection,)
     requires = (Connection,)
 
 
-    def __init__(self, c, task_events=True,
+    def __init__(self, c,
-                 without_heartbeat=False, without_gossip=False, **kwargs):
+                 task_events=True,
+                 without_heartbeat=False,
+                 without_gossip=False,
+                 **kwargs):
         self.groups = None if task_events else ['worker']
         self.groups = None if task_events else ['worker']
         self.send_events = (
         self.send_events = (
             task_events or
             task_events or
@@ -30,8 +33,12 @@ class Events(bootsteps.StartStopStep):
         # flush events sent while connection was down.
         # flush events sent while connection was down.
         prev = self._close(c)
         prev = self._close(c)
         dis = c.event_dispatcher = c.app.events.Dispatcher(
         dis = c.event_dispatcher = c.app.events.Dispatcher(
-            c.connect(), hostname=c.hostname,
+            c.connection_for_write(),
-            enabled=self.send_events, groups=self.groups,
+            hostname=c.hostname,
+            enabled=self.send_events,
+            groups=self.groups,
+            # we currently only buffer events when the event loop is enabled
+            # XXX This excludes eventlet/gevent, which should actually buffer.
             buffer_group=['task'] if c.hub else None,
             buffer_group=['task'] if c.hub else None,
             on_send_buffered=c.on_send_event_buffered if c.hub else None,
             on_send_buffered=c.on_send_event_buffered if c.hub else None,
         )
         )