|
@@ -153,7 +153,7 @@ class test_Consumer(AppCase):
|
|
|
l.connection = Mock()
|
|
|
l.connection.info.return_value = {'foo': 'bar'}
|
|
|
l.controller = l.app.WorkController()
|
|
|
- l.controller.pool = Mock()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.controller.pool.info.return_value = [Mock(), Mock()]
|
|
|
l.controller.consumer = l
|
|
|
info = l.controller.stats()
|
|
@@ -167,6 +167,8 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
def test_connection(self):
|
|
|
l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
|
|
|
l.blueprint.start(l)
|
|
|
self.assertIsInstance(l.connection, Connection)
|
|
@@ -229,6 +231,8 @@ class test_Consumer(AppCase):
|
|
|
def test_receive_message_eta_OverflowError(self, to_timestamp):
|
|
|
to_timestamp.side_effect = OverflowError()
|
|
|
l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.blueprint.state = RUN
|
|
|
l.steps.pop()
|
|
|
m = create_task_message(
|
|
@@ -251,6 +255,8 @@ class test_Consumer(AppCase):
|
|
|
l.blueprint.state = RUN
|
|
|
l.event_dispatcher = mock_event_dispatcher()
|
|
|
l.steps.pop()
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
m = create_task_message(
|
|
|
Mock(), self.foo_task.name,
|
|
|
args=(1, 2), kwargs='foobarbaz', id=1)
|
|
@@ -293,6 +299,8 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
def test_receieve_message(self):
|
|
|
l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.blueprint.state = RUN
|
|
|
l.event_dispatcher = mock_event_dispatcher()
|
|
|
m = create_task_message(
|
|
@@ -322,6 +330,8 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
l = MockConsumer(self.buffer.put, timer=self.timer,
|
|
|
send_events=False, pool=BasePool(), app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.channel_errors = (KeyError, )
|
|
|
with self.assertRaises(KeyError):
|
|
|
l.start()
|
|
@@ -340,6 +350,8 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
l = MockConsumer(self.buffer.put, timer=self.timer,
|
|
|
send_events=False, pool=BasePool(), app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
|
|
|
l.connection_errors = (KeyError, )
|
|
|
self.assertRaises(SyntaxError, l.start)
|
|
@@ -422,6 +434,8 @@ class test_Consumer(AppCase):
|
|
|
def test_apply_eta_task(self):
|
|
|
from celery.worker import state
|
|
|
l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.qos = QoS(None, 10)
|
|
|
|
|
|
task = object()
|
|
@@ -433,6 +447,8 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
def test_receieve_message_eta_isoformat(self):
|
|
|
l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.blueprint.state = RUN
|
|
|
l.steps.pop()
|
|
|
m = create_task_message(
|
|
@@ -518,6 +534,8 @@ class test_Consumer(AppCase):
|
|
|
@patch('celery.worker.consumer.logger')
|
|
|
def test_receieve_message_ack_raises(self, logger, warn):
|
|
|
l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.blueprint.state = RUN
|
|
|
channel = Mock()
|
|
|
m = create_task_message(
|
|
@@ -550,6 +568,8 @@ class test_Consumer(AppCase):
|
|
|
pp('TEST RECEIVE MESSAGE ETA')
|
|
|
pp('+CREATE MYKOMBUCONSUMER')
|
|
|
l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
pp('-CREATE MYKOMBUCONSUMER')
|
|
|
l.steps.pop()
|
|
|
l.event_dispatcher = mock_event_dispatcher()
|
|
@@ -732,6 +752,8 @@ class test_Consumer(AppCase):
|
|
|
init_callback = Mock()
|
|
|
l = _Consumer(self.buffer.put, timer=self.timer,
|
|
|
init_callback=init_callback, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.task_consumer = Mock()
|
|
|
l.broadcast_consumer = Mock()
|
|
|
l.qos = _QoS()
|
|
@@ -754,6 +776,8 @@ class test_Consumer(AppCase):
|
|
|
init_callback.reset_mock()
|
|
|
l = _Consumer(self.buffer.put, timer=self.timer, app=self.app,
|
|
|
send_events=False, init_callback=init_callback)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.qos = _QoS()
|
|
|
l.task_consumer = Mock()
|
|
|
l.broadcast_consumer = Mock()
|
|
@@ -765,8 +789,9 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
def test_reset_connection_with_no_node(self):
|
|
|
l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ l.controller = l.app.WorkController()
|
|
|
+ l.pool = l.controller.pool = Mock()
|
|
|
l.steps.pop()
|
|
|
- self.assertEqual(None, l.pool)
|
|
|
l.blueprint.start(l)
|
|
|
|
|
|
|