Browse Source

Worker: Fixes heartbeats for gevent/eventlet. Closes #3649

Ask Solem 8 years ago
parent
commit
9b6d03f032

+ 4 - 3
celery/worker/consumer/consumer.py

@@ -409,8 +409,11 @@ class Consumer(object):
             self.app.connection_for_read(heartbeat=heartbeat))
 
     def connection_for_write(self, heartbeat=None):
-        return self.ensure_connected(
+        conn = self.ensure_connected(
             self.app.connection_for_write(heartbeat=heartbeat))
+        if self.hub:
+            conn.transport.register_with_event_loop(conn.connection, self.hub)
+        return conn
 
     def ensure_connected(self, conn):
         # Callback called for each retry while the connection
@@ -432,8 +435,6 @@ class Consumer(object):
             _error_handler, self.app.conf.broker_connection_max_retries,
             callback=maybe_shutdown,
         )
-        if self.hub:
-            conn.transport.register_with_event_loop(conn.connection, self.hub)
         return conn
 
     def _flush_events(self):

+ 1 - 1
celery/worker/pidbox.py

@@ -105,7 +105,7 @@ class gPidbox(Pidbox):
         shutdown = self._node_shutdown = threading.Event()
         stopped = self._node_stopped = threading.Event()
         try:
-            with c.connect() as connection:
+            with c.connection_for_read() as connection:
                 info('pidbox: Connected to %s.', connection.as_uri())
                 self._do_reset(c, connection)
                 while not shutdown.is_set() and c.connection:

+ 2 - 1
t/unit/worker/test_control.py

@@ -97,7 +97,8 @@ class test_Pidbox_green:
 
     def test_loop(self):
         parent = Mock()
-        conn = parent.connect.return_value = self.app.connection_for_read()
+        conn = self.app.connection_for_read()
+        parent.connection_for_read.return_value = conn
         drain = conn.drain_events = Mock()
         g = gPidbox(parent)
         parent.connection = Mock()

+ 1 - 1
t/unit/worker/test_worker.py

@@ -613,7 +613,7 @@ class test_Consumer(ConsumerCase):
             def close(self):
                 self.closed = True
 
-        c.connect = lambda: Connection(obj=c)
+        c.connection_for_read = lambda: Connection(obj=c)
         controller = find_step(c, consumer.Control)
         controller.box.loop(c)