|  | @@ -9,6 +9,7 @@ from Queue import Empty
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from billiard.exceptions import WorkerLostError
 | 
	
		
			
				|  |  |  from kombu import Connection
 | 
	
		
			
				|  |  | +from kombu.common import QoS, PREFETCH_COUNT_MAX, ignore_errors
 | 
	
		
			
				|  |  |  from kombu.exceptions import StdChannelError
 | 
	
		
			
				|  |  |  from kombu.transport.base import Message
 | 
	
		
			
				|  |  |  from mock import Mock, patch
 | 
	
	
		
			
				|  | @@ -16,6 +17,7 @@ from nose import SkipTest
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from celery import current_app
 | 
	
		
			
				|  |  |  from celery.app.defaults import DEFAULTS
 | 
	
		
			
				|  |  | +from celery.bootsteps import RUN, CLOSE, TERMINATE, StartStopStep
 | 
	
		
			
				|  |  |  from celery.concurrency.base import BasePool
 | 
	
		
			
				|  |  |  from celery.datastructures import AttributeDict
 | 
	
		
			
				|  |  |  from celery.exceptions import SystemTerminate
 | 
	
	
		
			
				|  | @@ -23,33 +25,51 @@ from celery.task import task as task_dec
 | 
	
		
			
				|  |  |  from celery.task import periodic_task as periodic_task_dec
 | 
	
		
			
				|  |  |  from celery.utils import uuid
 | 
	
		
			
				|  |  |  from celery.worker import WorkController
 | 
	
		
			
				|  |  | -from celery.worker.components import Queues, Timers, EvLoop, Pool
 | 
	
		
			
				|  |  | +from celery.worker import components
 | 
	
		
			
				|  |  |  from celery.worker.buckets import FastQueue
 | 
	
		
			
				|  |  |  from celery.worker.job import Request
 | 
	
		
			
				|  |  | -from celery.worker.consumer import BlockingConsumer
 | 
	
		
			
				|  |  | -from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
 | 
	
		
			
				|  |  | +from celery.worker import consumer
 | 
	
		
			
				|  |  | +from celery.worker.consumer import Consumer
 | 
	
		
			
				|  |  |  from celery.utils.serialization import pickle
 | 
	
		
			
				|  |  |  from celery.utils.timer2 import Timer
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from celery.tests.utils import AppCase, Case
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +def MockStep(step=None):
 | 
	
		
			
				|  |  | +    step = Mock() if step is None else step
 | 
	
		
			
				|  |  | +    step.namespace = Mock()
 | 
	
		
			
				|  |  | +    step.namespace.name = 'MockNS'
 | 
	
		
			
				|  |  | +    step.name = 'MockStep'
 | 
	
		
			
				|  |  | +    return step
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  class PlaceHolder(object):
 | 
	
		
			
				|  |  |          pass
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -class MyKombuConsumer(BlockingConsumer):
 | 
	
		
			
				|  |  | +def find_step(obj, typ):
 | 
	
		
			
				|  |  | +    return obj.namespace.steps[typ.name]
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class _MyKombuConsumer(Consumer):
 | 
	
		
			
				|  |  |      broadcast_consumer = Mock()
 | 
	
		
			
				|  |  |      task_consumer = Mock()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __init__(self, *args, **kwargs):
 | 
	
		
			
				|  |  |          kwargs.setdefault('pool', BasePool(2))
 | 
	
		
			
				|  |  | -        super(MyKombuConsumer, self).__init__(*args, **kwargs)
 | 
	
		
			
				|  |  | +        super(_MyKombuConsumer, self).__init__(*args, **kwargs)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def restart_heartbeat(self):
 | 
	
		
			
				|  |  |          self.heart = None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +class MyKombuConsumer(Consumer):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def loop(self, *args, **kwargs):
 | 
	
		
			
				|  |  | +        pass
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  class MockNode(object):
 | 
	
		
			
				|  |  |      commands = []
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -166,19 +186,19 @@ class test_QoS(Case):
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_consumer_increment_decrement(self):
 | 
	
		
			
				|  |  | -        consumer = Mock()
 | 
	
		
			
				|  |  | -        qos = QoS(consumer, 10)
 | 
	
		
			
				|  |  | +        mconsumer = Mock()
 | 
	
		
			
				|  |  | +        qos = QoS(mconsumer.qos, 10)
 | 
	
		
			
				|  |  |          qos.update()
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, 10)
 | 
	
		
			
				|  |  | -        consumer.qos.assert_called_with(prefetch_count=10)
 | 
	
		
			
				|  |  | +        mconsumer.qos.assert_called_with(prefetch_count=10)
 | 
	
		
			
				|  |  |          qos.decrement_eventually()
 | 
	
		
			
				|  |  |          qos.update()
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, 9)
 | 
	
		
			
				|  |  | -        consumer.qos.assert_called_with(prefetch_count=9)
 | 
	
		
			
				|  |  | +        mconsumer.qos.assert_called_with(prefetch_count=9)
 | 
	
		
			
				|  |  |          qos.decrement_eventually()
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, 8)
 | 
	
		
			
				|  |  | -        consumer.qos.assert_called_with(prefetch_count=9)
 | 
	
		
			
				|  |  | -        self.assertIn({'prefetch_count': 9}, consumer.qos.call_args)
 | 
	
		
			
				|  |  | +        mconsumer.qos.assert_called_with(prefetch_count=9)
 | 
	
		
			
				|  |  | +        self.assertIn({'prefetch_count': 9}, mconsumer.qos.call_args)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          # Does not decrement 0 value
 | 
	
		
			
				|  |  |          qos.value = 0
 | 
	
	
		
			
				|  | @@ -188,8 +208,8 @@ class test_QoS(Case):
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, 0)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_consumer_decrement_eventually(self):
 | 
	
		
			
				|  |  | -        consumer = Mock()
 | 
	
		
			
				|  |  | -        qos = QoS(consumer, 10)
 | 
	
		
			
				|  |  | +        mconsumer = Mock()
 | 
	
		
			
				|  |  | +        qos = QoS(mconsumer.qos, 10)
 | 
	
		
			
				|  |  |          qos.decrement_eventually()
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, 9)
 | 
	
		
			
				|  |  |          qos.value = 0
 | 
	
	
		
			
				|  | @@ -197,8 +217,8 @@ class test_QoS(Case):
 | 
	
		
			
				|  |  |          self.assertEqual(qos.value, 0)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_set(self):
 | 
	
		
			
				|  |  | -        consumer = Mock()
 | 
	
		
			
				|  |  | -        qos = QoS(consumer, 10)
 | 
	
		
			
				|  |  | +        mconsumer = Mock()
 | 
	
		
			
				|  |  | +        qos = QoS(mconsumer.qos, 10)
 | 
	
		
			
				|  |  |          qos.set(12)
 | 
	
		
			
				|  |  |          self.assertEqual(qos.prev, 12)
 | 
	
		
			
				|  |  |          qos.set(qos.prev)
 | 
	
	
		
			
				|  | @@ -215,7 +235,8 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_info(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l.qos = QoS(l.task_consumer, 10)
 | 
	
		
			
				|  |  | +        l.task_consumer = Mock()
 | 
	
		
			
				|  |  | +        l.qos = QoS(l.task_consumer.qos, 10)
 | 
	
		
			
				|  |  |          info = l.info
 | 
	
		
			
				|  |  |          self.assertEqual(info['prefetch_count'], 10)
 | 
	
		
			
				|  |  |          self.assertFalse(info['broker'])
 | 
	
	
		
			
				|  | @@ -226,90 +247,102 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_start_when_closed(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l._state = CLOSE
 | 
	
		
			
				|  |  | +        l.namespace.state = CLOSE
 | 
	
		
			
				|  |  |          l.start()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_connection(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.reset_connection()
 | 
	
		
			
				|  |  | +        l.namespace.start(l)
 | 
	
		
			
				|  |  |          self.assertIsInstance(l.connection, Connection)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l._state = RUN
 | 
	
		
			
				|  |  | +        l.namespace.state = RUN
 | 
	
		
			
				|  |  |          l.event_dispatcher = None
 | 
	
		
			
				|  |  | -        l.stop_consumers(close_connection=False)
 | 
	
		
			
				|  |  | +        l.namespace.restart(l)
 | 
	
		
			
				|  |  |          self.assertTrue(l.connection)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l._state = RUN
 | 
	
		
			
				|  |  | -        l.stop_consumers()
 | 
	
		
			
				|  |  | +        l.namespace.state = RUN
 | 
	
		
			
				|  |  | +        l.shutdown()
 | 
	
		
			
				|  |  |          self.assertIsNone(l.connection)
 | 
	
		
			
				|  |  |          self.assertIsNone(l.task_consumer)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.reset_connection()
 | 
	
		
			
				|  |  | +        l.namespace.start(l)
 | 
	
		
			
				|  |  |          self.assertIsInstance(l.connection, Connection)
 | 
	
		
			
				|  |  | -        l.stop_consumers()
 | 
	
		
			
				|  |  | +        l.namespace.restart(l)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          l.stop()
 | 
	
		
			
				|  |  | -        l.close_connection()
 | 
	
		
			
				|  |  | +        l.shutdown()
 | 
	
		
			
				|  |  |          self.assertIsNone(l.connection)
 | 
	
		
			
				|  |  |          self.assertIsNone(l.task_consumer)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_close_connection(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l._state = RUN
 | 
	
		
			
				|  |  | -        l.close_connection()
 | 
	
		
			
				|  |  | +        l.namespace.state = RUN
 | 
	
		
			
				|  |  | +        step = find_step(l, consumer.Connection)
 | 
	
		
			
				|  |  | +        conn = l.connection = Mock()
 | 
	
		
			
				|  |  | +        step.shutdown(l)
 | 
	
		
			
				|  |  | +        self.assertTrue(conn.close.called)
 | 
	
		
			
				|  |  | +        self.assertIsNone(l.connection)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          eventer = l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  |          eventer.enabled = True
 | 
	
		
			
				|  |  |          heart = l.heart = MockHeart()
 | 
	
		
			
				|  |  | -        l._state = RUN
 | 
	
		
			
				|  |  | -        l.stop_consumers()
 | 
	
		
			
				|  |  | +        l.namespace.state = RUN
 | 
	
		
			
				|  |  | +        Events = find_step(l, consumer.Events)
 | 
	
		
			
				|  |  | +        Events.shutdown(l)
 | 
	
		
			
				|  |  | +        Heart = find_step(l, consumer.Heart)
 | 
	
		
			
				|  |  | +        Heart.shutdown(l)
 | 
	
		
			
				|  |  |          self.assertTrue(eventer.close.call_count)
 | 
	
		
			
				|  |  |          self.assertTrue(heart.closed)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @patch('celery.worker.consumer.warn')
 | 
	
		
			
				|  |  |      def test_receive_message_unknown(self, warn):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          backend = Mock()
 | 
	
		
			
				|  |  |          m = create_message(backend, unknown={'baz': '!!!'})
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | -        l.pidbox_node = MockNode()
 | 
	
		
			
				|  |  | +        l.node = MockNode()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.receive_message(m.decode(), m)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(m.decode(), m)
 | 
	
		
			
				|  |  |          self.assertTrue(warn.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    @patch('celery.utils.timer2.to_timestamp')
 | 
	
		
			
				|  |  | +    @patch('celery.worker.consumer.to_timestamp')
 | 
	
		
			
				|  |  |      def test_receive_message_eta_OverflowError(self, to_timestamp):
 | 
	
		
			
				|  |  |          to_timestamp.side_effect = OverflowError()
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          m = create_message(Mock(), task=foo_task.name,
 | 
	
		
			
				|  |  |                                     args=('2, 2'),
 | 
	
		
			
				|  |  |                                     kwargs={},
 | 
	
		
			
				|  |  |                                     eta=datetime.now().isoformat())
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | -        l.pidbox_node = MockNode()
 | 
	
		
			
				|  |  | +        l.node = MockNode()
 | 
	
		
			
				|  |  |          l.update_strategies()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.receive_message(m.decode(), m)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(m.decode(), m)
 | 
	
		
			
				|  |  |          self.assertTrue(m.acknowledged)
 | 
	
		
			
				|  |  |          self.assertTrue(to_timestamp.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @patch('celery.worker.consumer.error')
 | 
	
		
			
				|  |  |      def test_receive_message_InvalidTaskError(self, error):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          m = create_message(Mock(), task=foo_task.name,
 | 
	
		
			
				|  |  |                             args=(1, 2), kwargs='foobarbaz', id=1)
 | 
	
		
			
				|  |  |          l.update_strategies()
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | -        l.pidbox_node = MockNode()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.receive_message(m.decode(), m)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(m.decode(), m)
 | 
	
		
			
				|  |  |          self.assertIn('Received invalid task message', error.call_args[0][0])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @patch('celery.worker.consumer.crit')
 | 
	
		
			
				|  |  |      def test_on_decode_error(self, crit):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class MockMessage(Mock):
 | 
	
		
			
				|  |  |              content_type = 'application/x-msgpack'
 | 
	
	
		
			
				|  | @@ -321,14 +354,25 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          self.assertTrue(message.ack.call_count)
 | 
	
		
			
				|  |  |          self.assertIn("Can't decode message body", crit.call_args[0][0])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    def _get_on_message(self, l):
 | 
	
		
			
				|  |  | +        l.qos = Mock()
 | 
	
		
			
				|  |  | +        l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | +        l.task_consumer = Mock()
 | 
	
		
			
				|  |  | +        l.connection = Mock()
 | 
	
		
			
				|  |  | +        l.connection.drain_events.side_effect = SystemExit()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        with self.assertRaises(SystemExit):
 | 
	
		
			
				|  |  | +            l.loop(*l.loop_args())
 | 
	
		
			
				|  |  | +        self.assertTrue(l.task_consumer.register_callback.called)
 | 
	
		
			
				|  |  | +        return l.task_consumer.register_callback.call_args[0][0]
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def test_receieve_message(self):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          m = create_message(Mock(), task=foo_task.name,
 | 
	
		
			
				|  |  |                             args=[2, 4, 8], kwargs={})
 | 
	
		
			
				|  |  |          l.update_strategies()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | -        l.receive_message(m.decode(), m)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(m.decode(), m)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          in_bucket = self.ready_queue.get_nowait()
 | 
	
		
			
				|  |  |          self.assertIsInstance(in_bucket, Request)
 | 
	
	
		
			
				|  | @@ -338,10 +382,10 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_start_connection_error(self):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        class MockConsumer(BlockingConsumer):
 | 
	
		
			
				|  |  | +        class MockConsumer(Consumer):
 | 
	
		
			
				|  |  |              iterations = 0
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            def consume_messages(self):
 | 
	
		
			
				|  |  | +            def loop(self, *args, **kwargs):
 | 
	
		
			
				|  |  |                  if not self.iterations:
 | 
	
		
			
				|  |  |                      self.iterations = 1
 | 
	
		
			
				|  |  |                      raise KeyError('foo')
 | 
	
	
		
			
				|  | @@ -359,10 +403,10 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          # Regression test for AMQPChannelExceptions that can occur within the
 | 
	
		
			
				|  |  |          # consumer. (i.e. 404 errors)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        class MockConsumer(BlockingConsumer):
 | 
	
		
			
				|  |  | +        class MockConsumer(Consumer):
 | 
	
		
			
				|  |  |              iterations = 0
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            def consume_messages(self):
 | 
	
		
			
				|  |  | +            def loop(self, *args, **kwargs):
 | 
	
		
			
				|  |  |                  if not self.iterations:
 | 
	
		
			
				|  |  |                      self.iterations = 1
 | 
	
		
			
				|  |  |                      raise KeyError('foo')
 | 
	
	
		
			
				|  | @@ -376,7 +420,7 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          l.heart.stop()
 | 
	
		
			
				|  |  |          l.timer.stop()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_consume_messages_ignores_socket_timeout(self):
 | 
	
		
			
				|  |  | +    def test_loop_ignores_socket_timeout(self):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class Connection(current_app.connection().__class__):
 | 
	
		
			
				|  |  |              obj = None
 | 
	
	
		
			
				|  | @@ -389,10 +433,10 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          l.connection = Connection()
 | 
	
		
			
				|  |  |          l.task_consumer = Mock()
 | 
	
		
			
				|  |  |          l.connection.obj = l
 | 
	
		
			
				|  |  | -        l.qos = QoS(l.task_consumer, 10)
 | 
	
		
			
				|  |  | -        l.consume_messages()
 | 
	
		
			
				|  |  | +        l.qos = QoS(l.task_consumer.qos, 10)
 | 
	
		
			
				|  |  | +        l.loop(*l.loop_args())
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_consume_messages_when_socket_error(self):
 | 
	
		
			
				|  |  | +    def test_loop_when_socket_error(self):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class Connection(current_app.connection().__class__):
 | 
	
		
			
				|  |  |              obj = None
 | 
	
	
		
			
				|  | @@ -401,20 +445,20 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |                  self.obj.connection = None
 | 
	
		
			
				|  |  |                  raise socket.error('foo')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l._state = RUN
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.namespace.state = RUN
 | 
	
		
			
				|  |  |          c = l.connection = Connection()
 | 
	
		
			
				|  |  |          l.connection.obj = l
 | 
	
		
			
				|  |  |          l.task_consumer = Mock()
 | 
	
		
			
				|  |  | -        l.qos = QoS(l.task_consumer, 10)
 | 
	
		
			
				|  |  | +        l.qos = QoS(l.task_consumer.qos, 10)
 | 
	
		
			
				|  |  |          with self.assertRaises(socket.error):
 | 
	
		
			
				|  |  | -            l.consume_messages()
 | 
	
		
			
				|  |  | +            l.loop(*l.loop_args())
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l._state = CLOSE
 | 
	
		
			
				|  |  | +        l.namespace.state = CLOSE
 | 
	
		
			
				|  |  |          l.connection = c
 | 
	
		
			
				|  |  | -        l.consume_messages()
 | 
	
		
			
				|  |  | +        l.loop(*l.loop_args())
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_consume_messages(self):
 | 
	
		
			
				|  |  | +    def test_loop(self):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class Connection(current_app.connection().__class__):
 | 
	
		
			
				|  |  |              obj = None
 | 
	
	
		
			
				|  | @@ -422,17 +466,16 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |              def drain_events(self, **kwargs):
 | 
	
		
			
				|  |  |                  self.obj.connection = None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          l.connection = Connection()
 | 
	
		
			
				|  |  |          l.connection.obj = l
 | 
	
		
			
				|  |  |          l.task_consumer = Mock()
 | 
	
		
			
				|  |  | -        l.qos = QoS(l.task_consumer, 10)
 | 
	
		
			
				|  |  | +        l.qos = QoS(l.task_consumer.qos, 10)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.consume_messages()
 | 
	
		
			
				|  |  | -        l.consume_messages()
 | 
	
		
			
				|  |  | +        l.loop(*l.loop_args())
 | 
	
		
			
				|  |  | +        l.loop(*l.loop_args())
 | 
	
		
			
				|  |  |          self.assertTrue(l.task_consumer.consume.call_count)
 | 
	
		
			
				|  |  |          l.task_consumer.qos.assert_called_with(prefetch_count=10)
 | 
	
		
			
				|  |  | -        l.task_consumer.qos = Mock()
 | 
	
		
			
				|  |  |          self.assertEqual(l.qos.value, 10)
 | 
	
		
			
				|  |  |          l.qos.decrement_eventually()
 | 
	
		
			
				|  |  |          self.assertEqual(l.qos.value, 9)
 | 
	
	
		
			
				|  | @@ -440,15 +483,15 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          self.assertEqual(l.qos.value, 9)
 | 
	
		
			
				|  |  |          l.task_consumer.qos.assert_called_with(prefetch_count=9)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_maybe_conn_error(self):
 | 
	
		
			
				|  |  | +    def test_ignore_errors(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          l.connection_errors = (KeyError, )
 | 
	
		
			
				|  |  |          l.channel_errors = (SyntaxError, )
 | 
	
		
			
				|  |  | -        l.maybe_conn_error(Mock(side_effect=AttributeError('foo')))
 | 
	
		
			
				|  |  | -        l.maybe_conn_error(Mock(side_effect=KeyError('foo')))
 | 
	
		
			
				|  |  | -        l.maybe_conn_error(Mock(side_effect=SyntaxError('foo')))
 | 
	
		
			
				|  |  | +        ignore_errors(l, Mock(side_effect=AttributeError('foo')))
 | 
	
		
			
				|  |  | +        ignore_errors(l, Mock(side_effect=KeyError('foo')))
 | 
	
		
			
				|  |  | +        ignore_errors(l, Mock(side_effect=SyntaxError('foo')))
 | 
	
		
			
				|  |  |          with self.assertRaises(IndexError):
 | 
	
		
			
				|  |  | -            l.maybe_conn_error(Mock(side_effect=IndexError('foo')))
 | 
	
		
			
				|  |  | +            ignore_errors(l, Mock(side_effect=IndexError('foo')))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_apply_eta_task(self):
 | 
	
		
			
				|  |  |          from celery.worker import state
 | 
	
	
		
			
				|  | @@ -463,18 +506,20 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          self.assertIs(self.ready_queue.get_nowait(), task)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_receieve_message_eta_isoformat(self):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          m = create_message(Mock(), task=foo_task.name,
 | 
	
		
			
				|  |  |                             eta=datetime.now().isoformat(),
 | 
	
		
			
				|  |  |                             args=[2, 4, 8], kwargs={})
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          l.task_consumer = Mock()
 | 
	
		
			
				|  |  | -        l.qos = QoS(l.task_consumer, l.initial_prefetch_count)
 | 
	
		
			
				|  |  | +        l.qos = QoS(l.task_consumer.qos, 1)
 | 
	
		
			
				|  |  |          current_pcount = l.qos.value
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  |          l.enabled = False
 | 
	
		
			
				|  |  |          l.update_strategies()
 | 
	
		
			
				|  |  | -        l.receive_message(m.decode(), m)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(m.decode(), m)
 | 
	
		
			
				|  |  |          l.timer.stop()
 | 
	
		
			
				|  |  |          l.timer.join(1)
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -487,28 +532,30 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          self.assertGreater(l.qos.value, current_pcount)
 | 
	
		
			
				|  |  |          l.timer.stop()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_on_control(self):
 | 
	
		
			
				|  |  | +    def test_pidbox_callback(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l.pidbox_node = Mock()
 | 
	
		
			
				|  |  | -        l.reset_pidbox_node = Mock()
 | 
	
		
			
				|  |  | +        con = find_step(l, consumer.Control).box
 | 
	
		
			
				|  |  | +        con.node = Mock()
 | 
	
		
			
				|  |  | +        con.reset = Mock()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.on_control('foo', 'bar')
 | 
	
		
			
				|  |  | -        l.pidbox_node.handle_message.assert_called_with('foo', 'bar')
 | 
	
		
			
				|  |  | +        con.on_message('foo', 'bar')
 | 
	
		
			
				|  |  | +        con.node.handle_message.assert_called_with('foo', 'bar')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.pidbox_node = Mock()
 | 
	
		
			
				|  |  | -        l.pidbox_node.handle_message.side_effect = KeyError('foo')
 | 
	
		
			
				|  |  | -        l.on_control('foo', 'bar')
 | 
	
		
			
				|  |  | -        l.pidbox_node.handle_message.assert_called_with('foo', 'bar')
 | 
	
		
			
				|  |  | +        con.node = Mock()
 | 
	
		
			
				|  |  | +        con.node.handle_message.side_effect = KeyError('foo')
 | 
	
		
			
				|  |  | +        con.on_message('foo', 'bar')
 | 
	
		
			
				|  |  | +        con.node.handle_message.assert_called_with('foo', 'bar')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.pidbox_node = Mock()
 | 
	
		
			
				|  |  | -        l.pidbox_node.handle_message.side_effect = ValueError('foo')
 | 
	
		
			
				|  |  | -        l.on_control('foo', 'bar')
 | 
	
		
			
				|  |  | -        l.pidbox_node.handle_message.assert_called_with('foo', 'bar')
 | 
	
		
			
				|  |  | -        l.reset_pidbox_node.assert_called_with()
 | 
	
		
			
				|  |  | +        con.node = Mock()
 | 
	
		
			
				|  |  | +        con.node.handle_message.side_effect = ValueError('foo')
 | 
	
		
			
				|  |  | +        con.on_message('foo', 'bar')
 | 
	
		
			
				|  |  | +        con.node.handle_message.assert_called_with('foo', 'bar')
 | 
	
		
			
				|  |  | +        self.assertTrue(con.reset.called)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_revoke(self):
 | 
	
		
			
				|  |  |          ready_queue = FastQueue()
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          backend = Mock()
 | 
	
		
			
				|  |  |          id = uuid()
 | 
	
		
			
				|  |  |          t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
 | 
	
	
		
			
				|  | @@ -516,16 +563,19 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          from celery.worker.state import revoked
 | 
	
		
			
				|  |  |          revoked.add(id)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.receive_message(t.decode(), t)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(t.decode(), t)
 | 
	
		
			
				|  |  |          self.assertTrue(ready_queue.empty())
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_receieve_message_not_registered(self):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          backend = Mock()
 | 
	
		
			
				|  |  |          m = create_message(backend, task='x.X.31x', args=[2, 4, 8], kwargs={})
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | -        self.assertFalse(l.receive_message(m.decode(), m))
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        self.assertFalse(callback(m.decode(), m))
 | 
	
		
			
				|  |  |          with self.assertRaises(Empty):
 | 
	
		
			
				|  |  |              self.ready_queue.get_nowait()
 | 
	
		
			
				|  |  |          self.assertTrue(self.timer.empty())
 | 
	
	
		
			
				|  | @@ -533,7 +583,7 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |      @patch('celery.worker.consumer.warn')
 | 
	
		
			
				|  |  |      @patch('celery.worker.consumer.logger')
 | 
	
		
			
				|  |  |      def test_receieve_message_ack_raises(self, logger, warn):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          backend = Mock()
 | 
	
		
			
				|  |  |          m = create_message(backend, args=[2, 4, 8], kwargs={})
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -541,7 +591,8 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          l.connection_errors = (socket.error, )
 | 
	
		
			
				|  |  |          m.reject = Mock()
 | 
	
		
			
				|  |  |          m.reject.side_effect = socket.error('foo')
 | 
	
		
			
				|  |  | -        self.assertFalse(l.receive_message(m.decode(), m))
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        self.assertFalse(callback(m.decode(), m))
 | 
	
		
			
				|  |  |          self.assertTrue(warn.call_count)
 | 
	
		
			
				|  |  |          with self.assertRaises(Empty):
 | 
	
		
			
				|  |  |              self.ready_queue.get_nowait()
 | 
	
	
		
			
				|  | @@ -550,7 +601,8 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          self.assertTrue(logger.critical.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_receieve_message_eta(self):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = _MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  |          l.event_dispatcher._outbound_buffer = deque()
 | 
	
		
			
				|  |  |          backend = Mock()
 | 
	
	
		
			
				|  | @@ -559,16 +611,17 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |                             eta=(datetime.now() +
 | 
	
		
			
				|  |  |                                 timedelta(days=1)).isoformat())
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.reset_connection()
 | 
	
		
			
				|  |  | +        l.namespace.start(l)
 | 
	
		
			
				|  |  |          p = l.app.conf.BROKER_CONNECTION_RETRY
 | 
	
		
			
				|  |  |          l.app.conf.BROKER_CONNECTION_RETRY = False
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  | -            l.reset_connection()
 | 
	
		
			
				|  |  | +            l.namespace.start(l)
 | 
	
		
			
				|  |  |          finally:
 | 
	
		
			
				|  |  |              l.app.conf.BROKER_CONNECTION_RETRY = p
 | 
	
		
			
				|  |  | -        l.stop_consumers()
 | 
	
		
			
				|  |  | +        l.namespace.restart(l)
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
		
			
				|  |  | -        l.receive_message(m.decode(), m)
 | 
	
		
			
				|  |  | +        callback = self._get_on_message(l)
 | 
	
		
			
				|  |  | +        callback(m.decode(), m)
 | 
	
		
			
				|  |  |          l.timer.stop()
 | 
	
		
			
				|  |  |          in_hold = l.timer.queue[0]
 | 
	
		
			
				|  |  |          self.assertEqual(len(in_hold), 3)
 | 
	
	
		
			
				|  | @@ -582,24 +635,33 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_reset_pidbox_node(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l.pidbox_node = Mock()
 | 
	
		
			
				|  |  | -        chan = l.pidbox_node.channel = Mock()
 | 
	
		
			
				|  |  | +        con = find_step(l, consumer.Control).box
 | 
	
		
			
				|  |  | +        con.node = Mock()
 | 
	
		
			
				|  |  | +        chan = con.node.channel = Mock()
 | 
	
		
			
				|  |  |          l.connection = Mock()
 | 
	
		
			
				|  |  |          chan.close.side_effect = socket.error('foo')
 | 
	
		
			
				|  |  |          l.connection_errors = (socket.error, )
 | 
	
		
			
				|  |  | -        l.reset_pidbox_node()
 | 
	
		
			
				|  |  | +        con.reset()
 | 
	
		
			
				|  |  |          chan.close.assert_called_with()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_reset_pidbox_node_green(self):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l.pool = Mock()
 | 
	
		
			
				|  |  | -        l.pool.is_green = True
 | 
	
		
			
				|  |  | -        l.reset_pidbox_node()
 | 
	
		
			
				|  |  | -        l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
 | 
	
		
			
				|  |  | +        from celery.worker.pidbox import gPidbox
 | 
	
		
			
				|  |  | +        pool = Mock()
 | 
	
		
			
				|  |  | +        pool.is_green = True
 | 
	
		
			
				|  |  | +        l = MyKombuConsumer(self.ready_queue, timer=self.timer, pool=pool)
 | 
	
		
			
				|  |  | +        con = find_step(l, consumer.Control)
 | 
	
		
			
				|  |  | +        self.assertIsInstance(con.box, gPidbox)
 | 
	
		
			
				|  |  | +        con.start(l)
 | 
	
		
			
				|  |  | +        l.pool.spawn_n.assert_called_with(
 | 
	
		
			
				|  |  | +            con.box.loop, l,
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test__green_pidbox_node(self):
 | 
	
		
			
				|  |  | -        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l.pidbox_node = Mock()
 | 
	
		
			
				|  |  | +        pool = Mock()
 | 
	
		
			
				|  |  | +        pool.is_green = True
 | 
	
		
			
				|  |  | +        l = MyKombuConsumer(self.ready_queue, timer=self.timer, pool=pool)
 | 
	
		
			
				|  |  | +        l.node = Mock()
 | 
	
		
			
				|  |  | +        controller = find_step(l, consumer.Control)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class BConsumer(Mock):
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -610,7 +672,7 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |              def __exit__(self, *exc_info):
 | 
	
		
			
				|  |  |                  self.cancel()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.pidbox_node.listen = BConsumer()
 | 
	
		
			
				|  |  | +        controller.box.node.listen = BConsumer()
 | 
	
		
			
				|  |  |          connections = []
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class Connection(object):
 | 
	
	
		
			
				|  | @@ -639,25 +701,26 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |                      self.calls += 1
 | 
	
		
			
				|  |  |                      raise socket.timeout()
 | 
	
		
			
				|  |  |                  self.obj.connection = None
 | 
	
		
			
				|  |  | -                self.obj._pidbox_node_shutdown.set()
 | 
	
		
			
				|  |  | +                controller.box._node_shutdown.set()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              def close(self):
 | 
	
		
			
				|  |  |                  self.closed = True
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          l.connection = Mock()
 | 
	
		
			
				|  |  | -        l._open_connection = lambda: Connection(obj=l)
 | 
	
		
			
				|  |  | -        l._green_pidbox_node()
 | 
	
		
			
				|  |  | +        l.connect = lambda: Connection(obj=l)
 | 
	
		
			
				|  |  | +        controller = find_step(l, consumer.Control)
 | 
	
		
			
				|  |  | +        controller.box.loop(l)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.pidbox_node.listen.assert_called_with(callback=l.on_control)
 | 
	
		
			
				|  |  | -        self.assertTrue(l.broadcast_consumer)
 | 
	
		
			
				|  |  | -        l.broadcast_consumer.consume.assert_called_with()
 | 
	
		
			
				|  |  | +        self.assertTrue(controller.box.node.listen.called)
 | 
	
		
			
				|  |  | +        self.assertTrue(controller.box.consumer)
 | 
	
		
			
				|  |  | +        controller.box.consumer.consume.assert_called_with()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self.assertIsNone(l.connection)
 | 
	
		
			
				|  |  |          self.assertTrue(connections[0].closed)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @patch('kombu.connection.Connection._establish_connection')
 | 
	
		
			
				|  |  |      @patch('kombu.utils.sleep')
 | 
	
		
			
				|  |  | -    def test_open_connection_errback(self, sleep, connect):
 | 
	
		
			
				|  |  | +    def test_connect_errback(self, sleep, connect):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          from kombu.transport.memory import Transport
 | 
	
		
			
				|  |  |          Transport.connection_errors = (StdChannelError, )
 | 
	
	
		
			
				|  | @@ -667,17 +730,18 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |                  return
 | 
	
		
			
				|  |  |              raise StdChannelError()
 | 
	
		
			
				|  |  |          connect.side_effect = effect
 | 
	
		
			
				|  |  | -        l._open_connection()
 | 
	
		
			
				|  |  | +        l.connect()
 | 
	
		
			
				|  |  |          connect.assert_called_with()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_stop_pidbox_node(self):
 | 
	
		
			
				|  |  |          l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | -        l._pidbox_node_stopped = Event()
 | 
	
		
			
				|  |  | -        l._pidbox_node_shutdown = Event()
 | 
	
		
			
				|  |  | -        l._pidbox_node_stopped.set()
 | 
	
		
			
				|  |  | -        l.stop_pidbox_node()
 | 
	
		
			
				|  |  | +        cont = find_step(l, consumer.Control)
 | 
	
		
			
				|  |  | +        cont._node_stopped = Event()
 | 
	
		
			
				|  |  | +        cont._node_shutdown = Event()
 | 
	
		
			
				|  |  | +        cont._node_stopped.set()
 | 
	
		
			
				|  |  | +        cont.stop(l)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_start__consume_messages(self):
 | 
	
		
			
				|  |  | +    def test_start__loop(self):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          class _QoS(object):
 | 
	
		
			
				|  |  |              prev = 3
 | 
	
	
		
			
				|  | @@ -702,18 +766,17 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          l.connection = Connection()
 | 
	
		
			
				|  |  |          l.iterations = 0
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        def raises_KeyError(limit=None):
 | 
	
		
			
				|  |  | +        def raises_KeyError(*args, **kwargs):
 | 
	
		
			
				|  |  |              l.iterations += 1
 | 
	
		
			
				|  |  |              if l.qos.prev != l.qos.value:
 | 
	
		
			
				|  |  |                  l.qos.update()
 | 
	
		
			
				|  |  |              if l.iterations >= 2:
 | 
	
		
			
				|  |  |                  raise KeyError('foo')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        l.consume_messages = raises_KeyError
 | 
	
		
			
				|  |  | +        l.loop = raises_KeyError
 | 
	
		
			
				|  |  |          with self.assertRaises(KeyError):
 | 
	
		
			
				|  |  |              l.start()
 | 
	
		
			
				|  |  | -        self.assertTrue(init_callback.call_count)
 | 
	
		
			
				|  |  | -        self.assertEqual(l.iterations, 1)
 | 
	
		
			
				|  |  | +        self.assertEqual(l.iterations, 2)
 | 
	
		
			
				|  |  |          self.assertEqual(l.qos.prev, l.qos.value)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          init_callback.reset_mock()
 | 
	
	
		
			
				|  | @@ -723,25 +786,25 @@ class test_Consumer(Case):
 | 
	
		
			
				|  |  |          l.task_consumer = Mock()
 | 
	
		
			
				|  |  |          l.broadcast_consumer = Mock()
 | 
	
		
			
				|  |  |          l.connection = Connection()
 | 
	
		
			
				|  |  | -        l.consume_messages = Mock(side_effect=socket.error('foo'))
 | 
	
		
			
				|  |  | +        l.loop = Mock(side_effect=socket.error('foo'))
 | 
	
		
			
				|  |  |          with self.assertRaises(socket.error):
 | 
	
		
			
				|  |  |              l.start()
 | 
	
		
			
				|  |  | -        self.assertTrue(init_callback.call_count)
 | 
	
		
			
				|  |  | -        self.assertTrue(l.consume_messages.call_count)
 | 
	
		
			
				|  |  | +        self.assertTrue(l.loop.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_reset_connection_with_no_node(self):
 | 
	
		
			
				|  |  | -        l = BlockingConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l.steps.pop()
 | 
	
		
			
				|  |  |          self.assertEqual(None, l.pool)
 | 
	
		
			
				|  |  | -        l.reset_connection()
 | 
	
		
			
				|  |  | +        l.namespace.start(l)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_on_task_revoked(self):
 | 
	
		
			
				|  |  | -        l = BlockingConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          task = Mock()
 | 
	
		
			
				|  |  |          task.revoked.return_value = True
 | 
	
		
			
				|  |  |          l.on_task(task)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_on_task_no_events(self):
 | 
	
		
			
				|  |  | -        l = BlockingConsumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  | +        l = Consumer(self.ready_queue, timer=self.timer)
 | 
	
		
			
				|  |  |          task = Mock()
 | 
	
		
			
				|  |  |          task.revoked.return_value = False
 | 
	
		
			
				|  |  |          l.event_dispatcher = Mock()
 | 
	
	
		
			
				|  | @@ -756,7 +819,6 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |      def setup(self):
 | 
	
		
			
				|  |  |          self.worker = self.create_worker()
 | 
	
		
			
				|  |  |          from celery import worker
 | 
	
		
			
				|  |  | -        from celery.worker import components
 | 
	
		
			
				|  |  |          self._logger = worker.logger
 | 
	
		
			
				|  |  |          self._comp_logger = components.logger
 | 
	
		
			
				|  |  |          self.logger = worker.logger = Mock()
 | 
	
	
		
			
				|  | @@ -764,20 +826,19 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def teardown(self):
 | 
	
		
			
				|  |  |          from celery import worker
 | 
	
		
			
				|  |  | -        from celery.worker import components
 | 
	
		
			
				|  |  |          worker.logger = self._logger
 | 
	
		
			
				|  |  |          components.logger = self._comp_logger
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def create_worker(self, **kw):
 | 
	
		
			
				|  |  |          worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
 | 
	
		
			
				|  |  | -        worker._shutdown_complete.set()
 | 
	
		
			
				|  |  | +        worker.namespace.shutdown_complete.set()
 | 
	
		
			
				|  |  |          return worker
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @patch('celery.platforms.create_pidlock')
 | 
	
		
			
				|  |  |      def test_use_pidfile(self, create_pidlock):
 | 
	
		
			
				|  |  |          create_pidlock.return_value = Mock()
 | 
	
		
			
				|  |  |          worker = self.create_worker(pidfile='pidfilelockfilepid')
 | 
	
		
			
				|  |  | -        worker.components = []
 | 
	
		
			
				|  |  | +        worker.steps = []
 | 
	
		
			
				|  |  |          worker.start()
 | 
	
		
			
				|  |  |          self.assertTrue(create_pidlock.called)
 | 
	
		
			
				|  |  |          worker.stop()
 | 
	
	
		
			
				|  | @@ -824,12 +885,12 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          self.assertTrue(worker.pool)
 | 
	
		
			
				|  |  |          self.assertTrue(worker.consumer)
 | 
	
		
			
				|  |  |          self.assertTrue(worker.mediator)
 | 
	
		
			
				|  |  | -        self.assertTrue(worker.components)
 | 
	
		
			
				|  |  | +        self.assertTrue(worker.steps)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_with_embedded_celerybeat(self):
 | 
	
		
			
				|  |  |          worker = WorkController(concurrency=1, loglevel=0, beat=True)
 | 
	
		
			
				|  |  |          self.assertTrue(worker.beat)
 | 
	
		
			
				|  |  | -        self.assertIn(worker.beat, worker.components)
 | 
	
		
			
				|  |  | +        self.assertIn(worker.beat, [w.obj for w in worker.steps])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_with_autoscaler(self):
 | 
	
		
			
				|  |  |          worker = self.create_worker(autoscale=[10, 3], send_events=False,
 | 
	
	
		
			
				|  | @@ -839,17 +900,17 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |      def test_dont_stop_or_terminate(self):
 | 
	
		
			
				|  |  |          worker = WorkController(concurrency=1, loglevel=0)
 | 
	
		
			
				|  |  |          worker.stop()
 | 
	
		
			
				|  |  | -        self.assertNotEqual(worker._state, worker.CLOSE)
 | 
	
		
			
				|  |  | +        self.assertNotEqual(worker.namespace.state, CLOSE)
 | 
	
		
			
				|  |  |          worker.terminate()
 | 
	
		
			
				|  |  | -        self.assertNotEqual(worker._state, worker.CLOSE)
 | 
	
		
			
				|  |  | +        self.assertNotEqual(worker.namespace.state, CLOSE)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  | -            worker._state = worker.RUN
 | 
	
		
			
				|  |  | +            worker.namespace.state = RUN
 | 
	
		
			
				|  |  |              worker.stop(in_sighandler=True)
 | 
	
		
			
				|  |  | -            self.assertNotEqual(worker._state, worker.CLOSE)
 | 
	
		
			
				|  |  | +            self.assertNotEqual(worker.namespace.state, CLOSE)
 | 
	
		
			
				|  |  |              worker.terminate(in_sighandler=True)
 | 
	
		
			
				|  |  | -            self.assertNotEqual(worker._state, worker.CLOSE)
 | 
	
		
			
				|  |  | +            self.assertNotEqual(worker.namespace.state, CLOSE)
 | 
	
		
			
				|  |  |          finally:
 | 
	
		
			
				|  |  |              worker.pool.signal_safe = sigsafe
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -859,14 +920,14 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  |              raise KeyError('foo')
 | 
	
		
			
				|  |  |          except KeyError as exc:
 | 
	
		
			
				|  |  | -            Timers(worker).on_timer_error(exc)
 | 
	
		
			
				|  |  | +            components.Timer(worker).on_timer_error(exc)
 | 
	
		
			
				|  |  |              msg, args = self.comp_logger.error.call_args[0]
 | 
	
		
			
				|  |  |              self.assertIn('KeyError', msg % args)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_on_timer_tick(self):
 | 
	
		
			
				|  |  |          worker = WorkController(concurrency=1, loglevel=10)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        Timers(worker).on_timer_tick(30.0)
 | 
	
		
			
				|  |  | +        components.Timer(worker).on_timer_tick(30.0)
 | 
	
		
			
				|  |  |          xargs = self.comp_logger.debug.call_args[0]
 | 
	
		
			
				|  |  |          fmt, arg = xargs[0], xargs[1]
 | 
	
		
			
				|  |  |          self.assertEqual(30.0, arg)
 | 
	
	
		
			
				|  | @@ -891,11 +952,11 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
 | 
	
		
			
				|  |  |                             kwargs={})
 | 
	
		
			
				|  |  |          task = Request.from_message(m, m.decode())
 | 
	
		
			
				|  |  | -        worker.components = []
 | 
	
		
			
				|  |  | -        worker._state = worker.RUN
 | 
	
		
			
				|  |  | +        worker.steps = []
 | 
	
		
			
				|  |  | +        worker.namespace.state = RUN
 | 
	
		
			
				|  |  |          with self.assertRaises(KeyboardInterrupt):
 | 
	
		
			
				|  |  |              worker.process_task(task)
 | 
	
		
			
				|  |  | -        self.assertEqual(worker._state, worker.TERMINATE)
 | 
	
		
			
				|  |  | +        self.assertEqual(worker.namespace.state, TERMINATE)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_process_task_raise_SystemTerminate(self):
 | 
	
		
			
				|  |  |          worker = self.worker
 | 
	
	
		
			
				|  | @@ -905,11 +966,11 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
 | 
	
		
			
				|  |  |                             kwargs={})
 | 
	
		
			
				|  |  |          task = Request.from_message(m, m.decode())
 | 
	
		
			
				|  |  | -        worker.components = []
 | 
	
		
			
				|  |  | -        worker._state = worker.RUN
 | 
	
		
			
				|  |  | +        worker.steps = []
 | 
	
		
			
				|  |  | +        worker.namespace.state = RUN
 | 
	
		
			
				|  |  |          with self.assertRaises(SystemExit):
 | 
	
		
			
				|  |  |              worker.process_task(task)
 | 
	
		
			
				|  |  | -        self.assertEqual(worker._state, worker.TERMINATE)
 | 
	
		
			
				|  |  | +        self.assertEqual(worker.namespace.state, TERMINATE)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_process_task_raise_regular(self):
 | 
	
		
			
				|  |  |          worker = self.worker
 | 
	
	
		
			
				|  | @@ -924,17 +985,18 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_start_catches_base_exceptions(self):
 | 
	
		
			
				|  |  |          worker1 = self.create_worker()
 | 
	
		
			
				|  |  | -        stc = Mock()
 | 
	
		
			
				|  |  | +        stc = MockStep()
 | 
	
		
			
				|  |  |          stc.start.side_effect = SystemTerminate()
 | 
	
		
			
				|  |  | -        worker1.components = [stc]
 | 
	
		
			
				|  |  | +        worker1.steps = [stc]
 | 
	
		
			
				|  |  |          worker1.start()
 | 
	
		
			
				|  |  | +        stc.start.assert_called_with(worker1)
 | 
	
		
			
				|  |  |          self.assertTrue(stc.terminate.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          worker2 = self.create_worker()
 | 
	
		
			
				|  |  | -        sec = Mock()
 | 
	
		
			
				|  |  | +        sec = MockStep()
 | 
	
		
			
				|  |  |          sec.start.side_effect = SystemExit()
 | 
	
		
			
				|  |  |          sec.terminate = None
 | 
	
		
			
				|  |  | -        worker2.components = [sec]
 | 
	
		
			
				|  |  | +        worker2.steps = [sec]
 | 
	
		
			
				|  |  |          worker2.start()
 | 
	
		
			
				|  |  |          self.assertTrue(sec.stop.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -986,14 +1048,19 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_start__stop(self):
 | 
	
		
			
				|  |  |          worker = self.worker
 | 
	
		
			
				|  |  | -        worker._shutdown_complete.set()
 | 
	
		
			
				|  |  | -        worker.components = [Mock(), Mock(), Mock(), Mock()]
 | 
	
		
			
				|  |  | +        worker.namespace.shutdown_complete.set()
 | 
	
		
			
				|  |  | +        worker.steps = [MockStep(StartStopStep(self)) for _ in range(4)]
 | 
	
		
			
				|  |  | +        worker.namespace.state = RUN
 | 
	
		
			
				|  |  | +        worker.namespace.started = 4
 | 
	
		
			
				|  |  | +        for w in worker.steps:
 | 
	
		
			
				|  |  | +            w.start = Mock()
 | 
	
		
			
				|  |  | +            w.stop = Mock()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          worker.start()
 | 
	
		
			
				|  |  | -        for w in worker.components:
 | 
	
		
			
				|  |  | +        for w in worker.steps:
 | 
	
		
			
				|  |  |              self.assertTrue(w.start.call_count)
 | 
	
		
			
				|  |  |          worker.stop()
 | 
	
		
			
				|  |  | -        for component in worker.components:
 | 
	
		
			
				|  |  | +        for w in worker.steps:
 | 
	
		
			
				|  |  |              self.assertTrue(w.stop.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          # Doesn't close pool if no pool.
 | 
	
	
		
			
				|  | @@ -1002,15 +1069,15 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          worker.stop()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          # test that stop of None is not attempted
 | 
	
		
			
				|  |  | -        worker.components[-1] = None
 | 
	
		
			
				|  |  | +        worker.steps[-1] = None
 | 
	
		
			
				|  |  |          worker.start()
 | 
	
		
			
				|  |  |          worker.stop()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_component_raises(self):
 | 
	
		
			
				|  |  | +    def test_step_raises(self):
 | 
	
		
			
				|  |  |          worker = self.worker
 | 
	
		
			
				|  |  | -        comp = Mock()
 | 
	
		
			
				|  |  | -        worker.components = [comp]
 | 
	
		
			
				|  |  | -        comp.start.side_effect = TypeError()
 | 
	
		
			
				|  |  | +        step = Mock()
 | 
	
		
			
				|  |  | +        worker.steps = [step]
 | 
	
		
			
				|  |  | +        step.start.side_effect = TypeError()
 | 
	
		
			
				|  |  |          worker.stop = Mock()
 | 
	
		
			
				|  |  |          worker.start()
 | 
	
		
			
				|  |  |          worker.stop.assert_called_with()
 | 
	
	
		
			
				|  | @@ -1020,36 +1087,34 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_start__terminate(self):
 | 
	
		
			
				|  |  |          worker = self.worker
 | 
	
		
			
				|  |  | -        worker._shutdown_complete.set()
 | 
	
		
			
				|  |  | -        worker.components = [Mock(), Mock(), Mock(), Mock(), Mock()]
 | 
	
		
			
				|  |  | -        for component in worker.components[:3]:
 | 
	
		
			
				|  |  | -            component.terminate = None
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +        worker.namespace.shutdown_complete.set()
 | 
	
		
			
				|  |  | +        worker.namespace.started = 5
 | 
	
		
			
				|  |  | +        worker.namespace.state = RUN
 | 
	
		
			
				|  |  | +        worker.steps = [MockStep() for _ in range(5)]
 | 
	
		
			
				|  |  |          worker.start()
 | 
	
		
			
				|  |  | -        for w in worker.components[:3]:
 | 
	
		
			
				|  |  | +        for w in worker.steps[:3]:
 | 
	
		
			
				|  |  |              self.assertTrue(w.start.call_count)
 | 
	
		
			
				|  |  | -        self.assertTrue(worker._running, len(worker.components))
 | 
	
		
			
				|  |  | -        self.assertEqual(worker._state, RUN)
 | 
	
		
			
				|  |  | +        self.assertTrue(worker.namespace.started, len(worker.steps))
 | 
	
		
			
				|  |  | +        self.assertEqual(worker.namespace.state, RUN)
 | 
	
		
			
				|  |  |          worker.terminate()
 | 
	
		
			
				|  |  | -        for component in worker.components[:3]:
 | 
	
		
			
				|  |  | -            self.assertTrue(component.stop.call_count)
 | 
	
		
			
				|  |  | -        self.assertTrue(worker.components[4].terminate.call_count)
 | 
	
		
			
				|  |  | +        for step in worker.steps:
 | 
	
		
			
				|  |  | +            self.assertTrue(step.terminate.call_count)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_Queues_pool_not_rlimit_safe(self):
 | 
	
		
			
				|  |  |          w = Mock()
 | 
	
		
			
				|  |  |          w.pool_cls.rlimit_safe = False
 | 
	
		
			
				|  |  | -        Queues(w).create(w)
 | 
	
		
			
				|  |  | +        components.Queues(w).create(w)
 | 
	
		
			
				|  |  |          self.assertTrue(w.disable_rate_limits)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_Queues_pool_no_sem(self):
 | 
	
		
			
				|  |  |          w = Mock()
 | 
	
		
			
				|  |  |          w.pool_cls.uses_semaphore = False
 | 
	
		
			
				|  |  | -        Queues(w).create(w)
 | 
	
		
			
				|  |  | +        components.Queues(w).create(w)
 | 
	
		
			
				|  |  |          self.assertIs(w.ready_queue.put, w.process_task)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def test_EvLoop_crate(self):
 | 
	
		
			
				|  |  | +    def test_Hub_crate(self):
 | 
	
		
			
				|  |  |          w = Mock()
 | 
	
		
			
				|  |  | -        x = EvLoop(w)
 | 
	
		
			
				|  |  | +        x = components.Hub(w)
 | 
	
		
			
				|  |  |          hub = x.create(w)
 | 
	
		
			
				|  |  |          self.assertTrue(w.timer.max_interval)
 | 
	
		
			
				|  |  |          self.assertIs(w.hub, hub)
 | 
	
	
		
			
				|  | @@ -1058,7 +1123,7 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          w = Mock()
 | 
	
		
			
				|  |  |          w.pool_cls = Mock()
 | 
	
		
			
				|  |  |          w.use_eventloop = False
 | 
	
		
			
				|  |  | -        pool = Pool(w)
 | 
	
		
			
				|  |  | +        pool = components.Pool(w)
 | 
	
		
			
				|  |  |          pool.create(w)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def test_Pool_create(self):
 | 
	
	
		
			
				|  | @@ -1070,7 +1135,7 @@ class test_WorkController(AppCase):
 | 
	
		
			
				|  |  |          P = w.pool_cls.return_value = Mock()
 | 
	
		
			
				|  |  |          P.timers = {Mock(): 30}
 | 
	
		
			
				|  |  |          w.use_eventloop = True
 | 
	
		
			
				|  |  | -        pool = Pool(w)
 | 
	
		
			
				|  |  | +        pool = components.Pool(w)
 | 
	
		
			
				|  |  |          pool.create(w)
 | 
	
		
			
				|  |  |          self.assertIsInstance(w.semaphore, BoundedSemaphore)
 | 
	
		
			
				|  |  |          self.assertTrue(w.hub.on_init)
 |