Browse Source

98% coverage for celery.worker.loops

Ask Solem 12 years ago
parent
commit
bd4bf77213

+ 0 - 1
celery/tests/app/test_log.py

@@ -22,7 +22,6 @@ from celery.tests.utils import (
 )
 
 
-
 class test_TaskFormatter(Case):
 
     def test_no_task(self):

+ 3 - 1
celery/tests/app/test_routes.py

@@ -106,7 +106,9 @@ class test_lookup_route(RouteCase):
     def test_expands_queue_in_options(self):
         with _queues(self.app):
             R = routes.prepare(())
-            router = Router(self.app, R, self.app.amqp.queues, create_missing=True)
+            router = Router(
+                self.app, R, self.app.amqp.queues, create_missing=True,
+            )
             # apply_async forwards all arguments, even exchange=None etc,
             # so need to make sure it's merged correctly.
             route = router.route(

+ 1 - 1
celery/tests/concurrency/test_processes.py

@@ -206,7 +206,7 @@ class test_AsynPool(PoolCase):
 
     def test_promise(self):
         fun = Mock()
-        x = mp.promise(fun, 1, foo=1)
+        x = mp.promise(fun, (1, ), {'foo': 1})
         x()
         self.assertTrue(x.ready)
         fun.assert_called_with(1, foo=1)

+ 22 - 0
celery/tests/utils.py

@@ -18,6 +18,7 @@ import time
 import warnings
 
 from contextlib import contextmanager
+from datetime import timedelta
 from functools import partial, wraps
 from types import ModuleType
 
@@ -588,3 +589,24 @@ def skip_if_jython(fun):
             raise SkipTest('does not work on Jython')
         return fun(*args, **kwargs)
     return _inner
+
+
+def body_from_sig(app, sig, utc=True):
+    sig._freeze()
+    callbacks = sig.options.pop('link', None)
+    errbacks = sig.options.pop('link_error', None)
+    countdown = sig.options.pop('countdown', None)
+    if countdown:
+        sig.options['eta'] = app.now() + timedelta(seconds=countdown)
+    eta = sig.options.pop('eta', None)
+    eta = eta.isoformat() if eta else None
+    return {
+        'task': sig.task,
+        'id': sig.id,
+        'args': sig.args,
+        'kwargs': sig.kwargs,
+        'callbacks': [dict(s) for s in callbacks] if callbacks else None,
+        'errbacks': [dict(s) for s in errbacks] if errbacks else None,
+        'eta': eta,
+        'utc': utc,
+    }

+ 7 - 7
celery/tests/worker/test_control.py

@@ -22,7 +22,7 @@ from celery.worker.job import TaskRequest
 from celery.worker.state import revoked
 from celery.worker.control import Panel
 from celery.worker.pidbox import Pidbox, gPidbox
-from celery.tests.utils import AppCase, Case
+from celery.tests.utils import AppCase
 
 hostname = socket.gethostname()
 
@@ -60,14 +60,14 @@ class test_Pidbox(AppCase):
     def test_shutdown(self):
         with patch('celery.worker.pidbox.ignore_errors') as eig:
             parent = Mock()
-            pidbox = Pidbox(parent)
-            pidbox._close_channel = Mock()
-            self.assertIs(pidbox.c, parent)
-            pconsumer = pidbox.consumer = Mock()
+            pbox = Pidbox(parent)
+            pbox._close_channel = Mock()
+            self.assertIs(pbox.c, parent)
+            pconsumer = pbox.consumer = Mock()
             cancel = pconsumer.cancel
-            pidbox.shutdown(parent)
+            pbox.shutdown(parent)
             eig.assert_called_with(parent, cancel)
-            pidbox._close_channel.assert_called_with(parent)
+            pbox._close_channel.assert_called_with(parent)
 
 
 class test_Pidbox_green(AppCase):

+ 4 - 1
celery/tests/worker/test_hub.py

@@ -246,7 +246,8 @@ class test_Hub(Case):
         on_close = Mock()
         hub.on_close.append(on_close)
 
-        with hub:
+        hub.init()
+        try:
             hub.init.assert_called_with()
 
             read_A = Mock()
@@ -257,6 +258,8 @@ class test_Hub(Case):
             hub.update_writers({20: write_A, File(21): write_B})
             self.assertTrue(hub.readers)
             self.assertTrue(hub.writers)
+        finally:
+            hub.close()
         self.assertFalse(hub.readers)
         self.assertFalse(hub.writers)
 

+ 367 - 0
celery/tests/worker/test_loops.py

@@ -0,0 +1,367 @@
+from __future__ import absolute_import
+
+import socket
+
+from mock import Mock
+
+from celery.exceptions import InvalidTaskError, SystemTerminate
+from celery.five import Empty
+from celery.worker import state
+from celery.worker.loops import asynloop, synloop, CLOSE, READ, WRITE, ERR
+
+from celery.tests.utils import AppCase, body_from_sig
+
+
+class X(object):
+
+    def __init__(self, heartbeat=None, on_task=None):
+        (
+            self.obj,
+            self.connection,
+            self.consumer,
+            self.strategies,
+            self.blueprint,
+            self.hub,
+            self.qos,
+            self.heartbeat,
+            self.handle_unknown_message,
+            self.handle_unknown_task,
+            self.handle_invalid_task,
+            self.clock,
+        ) = self.args = [Mock(name='obj'),
+                         Mock(name='connection'),
+                         Mock(name='consumer'),
+                         {},
+                         Mock(name='blueprint'),
+                         Mock(name='Hub'),
+                         Mock(name='qos'),
+                         heartbeat,
+                         Mock(name='handle_unknown_message'),
+                         Mock(name='handle_unknown_task'),
+                         Mock(name='handle_invalid_task'),
+                         Mock(name='clock')]
+        self.connection.supports_heartbeats = True
+        self.consumer.callbacks = []
+        self.connection.connection_errors = (socket.error, )
+        #hent = self.Hub.__enter__ = Mock(name='Hub.__enter__')
+        #self.Hub.__exit__ = Mock(name='Hub.__exit__')
+        #self.hub = hent.return_value = Mock(name='hub_context')
+        self.hub.on_task = on_task or []
+        self.hub.readers = {}
+        self.hub.writers = {}
+        self.Hub = self.hub
+
+    def timeout_then_error(self, mock):
+
+        def first(*args, **kwargs):
+            mock.side_effect = socket.error()
+            self.connection.more_to_read = False
+            raise socket.timeout()
+        mock.side_effect = first
+
+    def close_then_error(self, mock):
+
+        def first(*args, **kwargs):
+            self.close()
+            self.connection.more_to_read = False
+            raise socket.error()
+        mock.side_effect = first
+
+    def close(self, *args, **kwargs):
+        self.blueprint.state = CLOSE
+
+    def closer(self, mock=None):
+        mock = Mock() if mock is None else mock
+        mock.side_effect = self.close
+        return mock
+
+
+def get_task_callback(*args, **kwargs):
+    x = X(*args, **kwargs)
+    x.blueprint.state = CLOSE
+    asynloop(*x.args)
+    return x, x.consumer.callbacks[0]
+
+
+class test_asynloop(AppCase):
+
+    def setup(self):
+
+        @self.app.task()
+        def add(x, y):
+            return x + y
+        self.add = add
+
+    def test_setup_heartbeat(self):
+        x = X(heartbeat=10)
+        x.blueprint.state = CLOSE
+        asynloop(*x.args)
+        x.consumer.consume.assert_called_with()
+        x.obj.on_ready.assert_called_with()
+        x.hub.timer.apply_interval.assert_called_with(
+            10 * 1000.0 / 2.0, x.connection.heartbeat_check, (2.0, ),
+        )
+
+    def task_context(self, sig, **kwargs):
+        x, on_task = get_task_callback(**kwargs)
+        body = body_from_sig(self.app, sig)
+        message = Mock()
+        strategy = x.strategies[sig.task] = Mock()
+        return x, on_task, body, message, strategy
+
+    def test_on_task_received(self):
+        _, on_task, body, msg, strategy = self.task_context(self.add.s(2, 2))
+        on_task(body, msg)
+        strategy.assert_called_with(msg, body, msg.ack_log_error)
+
+    def test_on_task_received_executes_hub_on_task(self):
+        cbs = [Mock(), Mock(), Mock()]
+        _, on_task, body, msg, _ = self.task_context(
+            self.add.s(2, 2), on_task=cbs,
+        )
+        on_task(body, msg)
+        [cb.assert_called_with() for cb in cbs]
+
+    def test_on_task_message_missing_name(self):
+        x, on_task, body, msg, strategy = self.task_context(self.add.s(2, 2))
+        body.pop('task')
+        on_task(body, msg)
+        x.handle_unknown_message.assert_called_with(body, msg)
+
+    def test_on_task_not_registered(self):
+        x, on_task, body, msg, strategy = self.task_context(self.add.s(2, 2))
+        exc = strategy.side_effect = KeyError(self.add.name)
+        on_task(body, msg)
+        x.handle_unknown_task.assert_called_with(body, msg, exc)
+
+    def test_on_task_InvalidTaskError(self):
+        x, on_task, body, msg, strategy = self.task_context(self.add.s(2, 2))
+        exc = strategy.side_effect = InvalidTaskError()
+        on_task(body, msg)
+        x.handle_invalid_task.assert_called_with(body, msg, exc)
+
+    def test_should_terminate(self):
+        x = X()
+        # XXX why aren't the errors propagated?!?
+        state.should_terminate = True
+        try:
+            with self.assertRaises(SystemTerminate):
+                asynloop(*x.args)
+        finally:
+            state.should_terminate = False
+
+    def test_should_terminate_hub_close_raises(self):
+        x = X()
+        # XXX why aren't the errors propagated?!?
+        state.should_terminate = True
+        x.hub.close.side_effect = MemoryError()
+        try:
+            with self.assertRaises(SystemTerminate):
+                asynloop(*x.args)
+        finally:
+            state.should_terminate = False
+
+    def test_should_stop(self):
+        x = X()
+        state.should_stop = True
+        try:
+            with self.assertRaises(SystemExit):
+                asynloop(*x.args)
+        finally:
+            state.should_stop = False
+
+    def test_updates_qos(self):
+        x = X()
+        x.qos.prev = 3
+        x.qos.value = 3
+        asynloop(*x.args, sleep=x.closer())
+        self.assertFalse(x.qos.update.called)
+
+        x = X()
+        x.qos.prev = 1
+        x.qos.value = 6
+        asynloop(*x.args, sleep=x.closer())
+        x.qos.update.assert_called_with()
+        x.hub.fire_timers.assert_called_with(propagate=(socket.error, ))
+        x.connection.transport.on_poll_start.assert_called_with()
+
+    def test_poll_empty(self):
+        x = X()
+        x.hub.readers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.fire_timers.return_value = 33.37
+        x.hub.poller.poll.return_value = []
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        x.hub.poller.poll.assert_called_with(33.37)
+        x.connection.transport.on_poll_empty.assert_called_with()
+
+    def test_poll_readable(self):
+        x = X()
+        x.hub.readers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, READ)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        x.hub.readers[6].assert_called_with(6, READ)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_readable_raises_Empty(self):
+        x = X()
+        x.hub.readers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, READ)]
+        x.hub.readers[6].side_effect = Empty()
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        x.hub.readers[6].assert_called_with(6, READ)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_writable(self):
+        x = X()
+        x.hub.writers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, WRITE)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        x.hub.writers[6].assert_called_with(6, WRITE)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_writable_none_registered(self):
+        x = X()
+        x.hub.writers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(7, WRITE)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_unknown_event(self):
+        x = X()
+        x.hub.writers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, 0)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_keep_draining_disabled(self):
+        x = X()
+        x.hub.writers = {6: Mock()}
+        poll = x.hub.poller.poll
+
+        def se(*args, **kwargs):
+            poll.side_effect = socket.error()
+        poll.side_effect = se
+
+        x.connection.transport.nb_keep_draining = False
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, 0)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        self.assertTrue(x.hub.poller.poll.called)
+        self.assertFalse(x.connection.drain_nowait.called)
+
+    def test_poll_err_writable(self):
+        x = X()
+        x.hub.writers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, ERR)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        x.hub.writers[6].assert_called_with(6, ERR)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_write_generator(self):
+        x = X()
+
+        def Gen():
+            yield 1
+            yield 2
+        gen = Gen()
+
+        x.hub.writers = {6: gen}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, WRITE)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        self.assertTrue(gen.gi_frame.f_lasti != -1)
+        self.assertFalse(x.hub.remove.called)
+
+    def test_poll_write_generator_stopped(self):
+        x = X()
+
+        def Gen():
+            raise StopIteration()
+            yield
+        gen = Gen()
+        x.hub.writers = {6: gen}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, WRITE)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        self.assertIsNone(gen.gi_frame)
+        x.hub.remove.assert_called_with(6)
+
+    def test_poll_write_generator_raises(self):
+        x = X()
+
+        def Gen():
+            raise ValueError('foo')
+            yield
+        gen = Gen()
+        x.hub.writers = {6: gen}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, WRITE)]
+        with self.assertRaises(ValueError):
+            asynloop(*x.args)
+        self.assertIsNone(gen.gi_frame)
+        x.hub.remove.assert_called_with(6)
+
+    def test_poll_err_readable(self):
+        x = X()
+        x.hub.readers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.return_value = [(6, ERR)]
+        with self.assertRaises(socket.error):
+            asynloop(*x.args)
+        x.hub.readers[6].assert_called_with(6, ERR)
+        self.assertTrue(x.hub.poller.poll.called)
+
+    def test_poll_raises_ValueError(self):
+        x = X()
+        x.hub.readers = {6: Mock()}
+        x.close_then_error(x.connection.drain_nowait)
+        x.hub.poller.poll.side_effect = ValueError()
+        asynloop(*x.args)
+        self.assertTrue(x.hub.poller.poll.called)
+
+
+class test_synloop(AppCase):
+
+    def test_timeout_ignored(self):
+        x = X()
+        x.timeout_then_error(x.connection.drain_events)
+        with self.assertRaises(socket.error):
+            synloop(*x.args)
+        self.assertEqual(x.connection.drain_events.call_count, 2)
+
+    def test_updates_qos_when_changed(self):
+        x = X()
+        x.qos.prev = 2
+        x.qos.value = 2
+        x.timeout_then_error(x.connection.drain_events)
+        with self.assertRaises(socket.error):
+            synloop(*x.args)
+        self.assertFalse(x.qos.update.called)
+
+        x.qos.value = 4
+        x.timeout_then_error(x.connection.drain_events)
+        with self.assertRaises(socket.error):
+            synloop(*x.args)
+        x.qos.update.assert_called_with()
+
+    def test_ignores_socket_errors_when_closed(self):
+        x = X()
+        x.close_then_error(x.connection.drain_events)
+        self.assertIsNone(synloop(*x.args))

+ 1 - 23
celery/tests/worker/test_strategy.py

@@ -2,7 +2,6 @@ from __future__ import absolute_import
 
 from collections import defaultdict
 from contextlib import contextmanager
-from datetime import timedelta
 from mock import Mock, patch
 
 from kombu.utils.limits import TokenBucket
@@ -11,28 +10,7 @@ from celery import Celery
 from celery.worker import state
 from celery.utils.timeutils import rate
 
-from celery.tests.utils import AppCase
-
-
-def body_from_sig(app, sig, utc=True):
-    sig._freeze()
-    callbacks = sig.options.pop('link', None)
-    errbacks = sig.options.pop('link_error', None)
-    countdown = sig.options.pop('countdown', None)
-    if countdown:
-        sig.options['eta'] = app.now() + timedelta(seconds=countdown)
-    eta = sig.options.pop('eta', None)
-    eta = eta.isoformat() if eta else None
-    return {
-        'task': sig.task,
-        'id': sig.id,
-        'args': sig.args,
-        'kwargs': sig.kwargs,
-        'callbacks': [dict(s) for s in callbacks] if callbacks else None,
-        'errbacks': [dict(s) for s in errbacks] if errbacks else None,
-        'eta': eta,
-        'utc': utc,
-    }
+from celery.tests.utils import AppCase, body_from_sig
 
 
 class test_default_strategy(AppCase):

+ 0 - 5
celery/worker/hub.py

@@ -232,10 +232,6 @@ class Hub(object):
         self.readers.pop(fd, None)
         self.writers.pop(fd, None)
 
-    def __enter__(self):
-        self.init()
-        return self
-
     def close(self, *args):
         [self._unregister(fd) for fd in self.readers]
         self.readers.clear()
@@ -243,7 +239,6 @@ class Hub(object):
         self.writers.clear()
         for callback in self.on_close:
             callback(self)
-    __exit__ = close
 
     def _repr_readers(self):
         return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))

+ 62 - 49
celery/worker/loops.py

@@ -17,60 +17,65 @@ from kombu.utils.eventio import READ, WRITE, ERR
 from celery.bootsteps import CLOSE
 from celery.exceptions import InvalidTaskError, SystemTerminate
 from celery.five import Empty
+from celery.utils.log import get_logger
 
 from . import state
 
+logger = get_logger(__name__)
+error = logger.error
 
-def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
+
+def asynloop(obj, connection, consumer, strategies, blueprint, hub, qos,
              heartbeat, handle_unknown_message, handle_unknown_task,
              handle_invalid_task, clock, hbrate=2.0,
              sleep=sleep, min=min, Empty=Empty):
     """Non-blocking eventloop consuming messages until connection is lost,
     or shutdown is requested."""
 
-    with hub as hub:
-        update_qos = qos.update
-        update_readers = hub.update_readers
-        readers, writers = hub.readers, hub.writers
-        poll = hub.poller.poll
-        fire_timers = hub.fire_timers
-        hub_add = hub.add
-        hub_remove = hub.remove
-        scheduled = hub.timer._queue
-        hbtick = connection.heartbeat_check
-        conn_poll_start = connection.transport.on_poll_start
-        conn_poll_empty = connection.transport.on_poll_empty
-        pool_poll_start = obj.pool.on_poll_start
-        drain_nowait = connection.drain_nowait
-        on_task_callbacks = hub.on_task
-        keep_draining = connection.transport.nb_keep_draining
-        errors = connection.connection_errors
-        hub_add, hub_remove = hub.add, hub.remove
-
-        if heartbeat and connection.supports_heartbeats:
-            hub.timer.apply_interval(
-                heartbeat * 1000.0 / hbrate, hbtick, (hbrate, ))
-
-        def on_task_received(body, message):
-            if on_task_callbacks:
-                [callback() for callback in on_task_callbacks]
-            try:
-                name = body['task']
-            except (KeyError, TypeError):
-                return handle_unknown_message(body, message)
-
-            try:
-                strategies[name](message, body, message.ack_log_error)
-            except KeyError as exc:
-                handle_unknown_task(body, message, exc)
-            except InvalidTaskError as exc:
-                handle_invalid_task(body, message, exc)
-
-        consumer.callbacks = [on_task_received]
-        consumer.consume()
-        obj.on_ready()
-
-        while ns.state != CLOSE and obj.connection:
+    hub.init()
+    update_qos = qos.update
+    update_readers = hub.update_readers
+    readers, writers = hub.readers, hub.writers
+    poll = hub.poller.poll
+    fire_timers = hub.fire_timers
+    hub_add = hub.add
+    hub_remove = hub.remove
+    scheduled = hub.timer._queue
+    hbtick = connection.heartbeat_check
+    conn_poll_start = connection.transport.on_poll_start
+    conn_poll_empty = connection.transport.on_poll_empty
+    pool_poll_start = obj.pool.on_poll_start
+    drain_nowait = connection.drain_nowait
+    on_task_callbacks = hub.on_task
+    keep_draining = connection.transport.nb_keep_draining
+    errors = connection.connection_errors
+    hub_add, hub_remove = hub.add, hub.remove
+
+    def on_task_received(body, message):
+        if on_task_callbacks:
+            [callback() for callback in on_task_callbacks]
+        try:
+            name = body['task']
+        except (KeyError, TypeError):
+            return handle_unknown_message(body, message)
+
+        try:
+            strategies[name](message, body, message.ack_log_error)
+        except KeyError as exc:
+            handle_unknown_task(body, message, exc)
+        except InvalidTaskError as exc:
+            handle_invalid_task(body, message, exc)
+
+    if heartbeat and connection.supports_heartbeats:
+        hub.timer.apply_interval(
+            heartbeat * 1000.0 / hbrate, hbtick, (hbrate, ))
+
+    consumer.callbacks = [on_task_received]
+    consumer.consume()
+    obj.on_ready()
+
+    try:
+        while blueprint.state != CLOSE and obj.connection:
             # shutdown if signal handlers told us to.
             if state.should_stop:
                 raise SystemExit()
@@ -99,6 +104,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                         #print('[[[EV]]]: %s' % (hub.repr_events(events), ))
                     except ValueError:  # Issue 882
                         return
+
                     if not events:
                         conn_poll_empty()
                     for fileno, event in events or ():
@@ -130,7 +136,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                                 except Empty:
                                     continue
                         except socket.error:
-                            if ns.state != CLOSE:  # pragma: no cover
+                            if blueprint.state != CLOSE:  # pragma: no cover
                                 raise
                     if keep_draining:
                         drain_nowait()
@@ -140,9 +146,16 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
             else:
                 # no sockets yet, startup is probably not done.
                 sleep(min(poll_timeout, 0.1))
+    finally:
+        try:
+            hub.close()
+        except Exception as exc:
+            error(
+                'Error cleaning up after eventloop: %r', exc, exc_info=1,
+            )
 
 
-def synloop(obj, connection, consumer, strategies, ns, hub, qos,
+def synloop(obj, connection, consumer, strategies, blueprint, hub, qos,
             heartbeat, handle_unknown_message, handle_unknown_task,
             handle_invalid_task, clock, hbrate=2.0, **kwargs):
     """Fallback blocking eventloop for transports that doesn't support AIO."""
@@ -165,14 +178,14 @@ def synloop(obj, connection, consumer, strategies, ns, hub, qos,
 
     obj.on_ready()
 
-    while ns.state != CLOSE and obj.connection:
+    while blueprint.state != CLOSE and obj.connection:
         state.maybe_shutdown()
-        if qos.prev != qos.value:         # pragma: no cover
+        if qos.prev != qos.value:
             qos.update()
         try:
             connection.drain_events(timeout=2.0)
         except socket.timeout:
             pass
         except socket.error:
-            if ns.state != CLOSE:  # pragma: no cover
+            if blueprint.state != CLOSE:
                 raise