Преглед на файлове

Make sure that Hub is reinitalized at consumer restart

Ask Solem преди 11 години
родител
ревизия
9d0c5c13c2
променени са 3 файла, в които са добавени 37 реда и са изтрити 20 реда
  1. 1 0
      celery/tests/worker/test_hub.py
  2. 30 20
      celery/tests/worker/test_loops.py
  3. 6 0
      celery/worker/loops.py

+ 1 - 0
celery/tests/worker/test_hub.py

@@ -321,6 +321,7 @@ class test_Hub(Case):
             self.assertTrue(hub.readers)
             self.assertTrue(hub.writers)
         finally:
+            assert hub.poller
             hub.close()
         self.assertFalse(hub.readers)
         self.assertFalse(hub.writers)

+ 30 - 20
celery/tests/worker/test_loops.py

@@ -222,64 +222,70 @@ class test_asynloop(AppCase):
         x.hub.timer._queue = [1]
         x.close_then_error(x.hub.poller.poll)
         x.hub.fire_timers.return_value = 33.37
-        x.hub.poller.poll.return_value = []
+        poller = x.hub.poller
+        poller.poll.return_value = []
         with self.assertRaises(socket.error):
             asynloop(*x.args)
-        x.hub.poller.poll.assert_called_with(33.37)
+        poller.poll.assert_called_with(33.37)
 
     def test_poll_readable(self):
         x = X(self.app)
         reader = Mock(name='reader')
         x.hub.add_reader(6, reader, 6)
         x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), mod=4))
-        x.hub.poller.poll.return_value = [(6, READ)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(6, READ)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
         reader.assert_called_with(6)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_readable_raises_Empty(self):
         x = X(self.app)
         reader = Mock(name='reader')
         x.hub.add_reader(6, reader, 6)
         x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))
-        x.hub.poller.poll.return_value = [(6, READ)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(6, READ)]
         reader.side_effect = Empty()
         with self.assertRaises(socket.error):
             asynloop(*x.args)
         reader.assert_called_with(6)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_writable(self):
         x = X(self.app)
         writer = Mock(name='writer')
         x.hub.add_writer(6, writer, 6)
         x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))
-        x.hub.poller.poll.return_value = [(6, WRITE)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(6, WRITE)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
         writer.assert_called_with(6)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_writable_none_registered(self):
         x = X(self.app)
         writer = Mock(name='writer')
         x.hub.add_writer(6, writer, 6)
         x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))
-        x.hub.poller.poll.return_value = [(7, WRITE)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(7, WRITE)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_unknown_event(self):
         x = X(self.app)
         writer = Mock(name='reader')
         x.hub.add_writer(6, writer, 6)
         x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2))
-        x.hub.poller.poll.return_value = [(6, 0)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(6, 0)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_keep_draining_disabled(self):
         x = X(self.app)
@@ -290,21 +296,23 @@ class test_asynloop(AppCase):
             poll.side_effect = socket.error()
         poll.side_effect = se
 
-        x.hub.poller.poll.return_value = [(6, 0)]
+        poller = x.hub.poller
+        poll.return_value = [(6, 0)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_err_writable(self):
         x = X(self.app)
         writer = Mock(name='writer')
         x.hub.add_writer(6, writer, 6, 48)
         x.hub.on_tick.add(x.close_then_error(Mock(), 2))
-        x.hub.poller.poll.return_value = [(6, ERR)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(6, ERR)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
         writer.assert_called_with(6, 48)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_write_generator(self):
         x = X(self.app)
@@ -359,18 +367,20 @@ class test_asynloop(AppCase):
         reader = Mock(name='reader')
         x.hub.add_reader(6, reader, 6, 24)
         x.hub.on_tick.add(x.close_then_error(Mock(), 2))
-        x.hub.poller.poll.return_value = [(6, ERR)]
+        poller = x.hub.poller
+        poller.poll.return_value = [(6, ERR)]
         with self.assertRaises(socket.error):
             asynloop(*x.args)
         reader.assert_called_with(6, 24)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
     def test_poll_raises_ValueError(self):
         x = X(self.app)
         x.hub.readers = {6: Mock()}
-        x.close_then_error(x.hub.poller.poll, exc=ValueError)
+        poller = x.hub.poller
+        x.close_then_error(poller.poll, exc=ValueError)
         asynloop(*x.args)
-        self.assertTrue(x.hub.poller.poll.called)
+        self.assertTrue(poller.poll.called)
 
 
 class test_synloop(AppCase):

+ 6 - 0
celery/worker/loops.py

@@ -25,6 +25,10 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
              heartbeat, clock, hbrate=2.0, RUN=RUN):
     """Non-blocking event loop consuming messages until connection is lost,
     or shutdown is requested."""
+    if hub.poller is None:
+        # TODO this must be cleaner, but works for now.
+        # here to make sure we have a epoll fd.
+        hub._create_poller()
 
     update_qos = qos.update
     readers, writers = hub.readers, hub.writers
@@ -79,6 +83,8 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
             error(
                 'Error cleaning up after event loop: %r', exc, exc_info=1,
             )
+        finally:
+            hub.poller = None
 
 
 def synloop(obj, connection, consumer, blueprint, hub, qos,