|
@@ -219,30 +219,20 @@ class test_Consumer(AppCase):
|
|
|
@patch('celery.worker.strategy.to_timestamp')
|
|
|
def test_receive_message_eta_OverflowError(self, to_timestamp):
|
|
|
to_timestamp.side_effect = OverflowError()
|
|
|
- print('+ CREATE _MyKombuConsumer')
|
|
|
l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- print('- CREATE _myKombuConsumer')
|
|
|
l.blueprint.state = RUN
|
|
|
l.steps.pop()
|
|
|
- print('+ CREATE MESSAGE')
|
|
|
m = create_message(Mock(), task=self.foo_task.name,
|
|
|
args=('2, 2'),
|
|
|
kwargs={},
|
|
|
eta=datetime.now().isoformat())
|
|
|
- print('- CREATE MESSAGE')
|
|
|
l.event_dispatcher = mock_event_dispatcher()
|
|
|
l.node = MockNode()
|
|
|
- print('+ UPDATE STRATEGIES')
|
|
|
l.update_strategies()
|
|
|
- print('- UPDATE STRATEGIES')
|
|
|
l.qos = Mock()
|
|
|
|
|
|
- print('+ GET ON MESSAGE')
|
|
|
callback = self._get_on_message(l)
|
|
|
- print('- GET ON MESSAGE')
|
|
|
- print('+ CALLBACK & m.decode()')
|
|
|
callback(m.decode(), m)
|
|
|
- print('- CALLBACK & m.decode()')
|
|
|
self.assertTrue(m.acknowledged)
|
|
|
|
|
|
@patch('celery.worker.consumer.error')
|
|
@@ -537,19 +527,33 @@ class test_Consumer(AppCase):
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
+ print('+ BLUEPRINT START 1')
|
|
|
l.blueprint.start(l)
|
|
|
+ print('- BLUEPRINT START 1')
|
|
|
p = l.app.conf.BROKER_CONNECTION_RETRY
|
|
|
l.app.conf.BROKER_CONNECTION_RETRY = False
|
|
|
+ print('+ BLUEPRINT START 2')
|
|
|
l.blueprint.start(l)
|
|
|
+ print('- BLUEPRINT START 2')
|
|
|
l.app.conf.BROKER_CONNECTION_RETRY = p
|
|
|
+ print('+ BLUEPRINT RESTART')
|
|
|
l.blueprint.restart(l)
|
|
|
+ print('- BLUEPRINT RESTART')
|
|
|
l.event_dispatcher = mock_event_dispatcher()
|
|
|
+ print('+ GET ON MESSAGE')
|
|
|
callback = self._get_on_message(l)
|
|
|
+ print('- GET ON MESSAGE')
|
|
|
+ print('+ CALLBACK')
|
|
|
callback(m.decode(), m)
|
|
|
+ print('- CALLBACK')
|
|
|
finally:
|
|
|
+ print('+ STOP TIMER')
|
|
|
l.timer.stop()
|
|
|
+ print('- STOP TIMER')
|
|
|
try:
|
|
|
+ print('+ JOIN TIMER')
|
|
|
l.timer.join()
|
|
|
+ print('- JOIN TIMER')
|
|
|
except RuntimeError:
|
|
|
pass
|
|
|
|