|
@@ -6,12 +6,14 @@ import sys
|
|
|
|
|
|
from collections import deque
|
|
|
from datetime import datetime, timedelta
|
|
|
+from functools import partial
|
|
|
from threading import Event
|
|
|
|
|
|
from amqp import ChannelError
|
|
|
from kombu import Connection
|
|
|
from kombu.common import QoS, ignore_errors
|
|
|
from kombu.transport.base import Message
|
|
|
+from kombu.transport.memory import Transport
|
|
|
|
|
|
from celery.bootsteps import RUN, CLOSE, TERMINATE, StartStopStep
|
|
|
from celery.concurrency.base import BasePool
|
|
@@ -22,9 +24,12 @@ from celery.exceptions import (
|
|
|
from celery.five import Empty, range, Queue as FastQueue
|
|
|
from celery.platforms import EX_FAILURE
|
|
|
from celery.utils import uuid
|
|
|
+from celery import worker as worker_module
|
|
|
from celery.worker import components
|
|
|
from celery.worker import consumer
|
|
|
-from celery.worker.consumer import Consumer as __Consumer
|
|
|
+from celery.worker import state
|
|
|
+from celery.worker.consumer import Consumer
|
|
|
+from celery.worker.pidbox import gPidbox
|
|
|
from celery.worker.request import Request
|
|
|
from celery.utils import worker_direct
|
|
|
from celery.utils.serialization import pickle
|
|
@@ -34,8 +39,10 @@ from celery.tests.case import AppCase, Mock, TaskMessage, patch, skip
|
|
|
|
|
|
|
|
|
def MockStep(step=None):
|
|
|
- step = Mock() if step is None else step
|
|
|
- step.blueprint = Mock()
|
|
|
+ if step is None:
|
|
|
+ step = Mock(name='step')
|
|
|
+ else:
|
|
|
+ step.blueprint = Mock(name='step.blueprint')
|
|
|
step.blueprint.name = 'MockNS'
|
|
|
step.name = 'MockStep(%s)' % (id(step),)
|
|
|
return step
|
|
@@ -48,76 +55,12 @@ def mock_event_dispatcher():
|
|
|
return evd
|
|
|
|
|
|
|
|
|
-class PlaceHolder(object):
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
def find_step(obj, typ):
|
|
|
return obj.blueprint.steps[typ.name]
|
|
|
|
|
|
|
|
|
-class Consumer(__Consumer):
|
|
|
-
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- kwargs.setdefault('without_mingle', True) # disable Mingle step
|
|
|
- kwargs.setdefault('without_gossip', True) # disable Gossip step
|
|
|
- kwargs.setdefault('without_heartbeat', True) # disable Heart step
|
|
|
- kwargs.setdefault('controller', Mock())
|
|
|
- super(Consumer, self).__init__(*args, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
-class _MyKombuConsumer(Consumer):
|
|
|
- broadcast_consumer = Mock()
|
|
|
- task_consumer = Mock()
|
|
|
-
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- kwargs.setdefault('pool', BasePool(2))
|
|
|
- kwargs.setdefault('controller', Mock())
|
|
|
- super(_MyKombuConsumer, self).__init__(*args, **kwargs)
|
|
|
-
|
|
|
- def restart_heartbeat(self):
|
|
|
- self.heart = None
|
|
|
-
|
|
|
-
|
|
|
-class MyKombuConsumer(Consumer):
|
|
|
-
|
|
|
- def loop(self, *args, **kwargs):
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
-class MockNode(object):
|
|
|
- commands = []
|
|
|
-
|
|
|
- def handle_message(self, body, message):
|
|
|
- self.commands.append(body.pop('command', None))
|
|
|
-
|
|
|
-
|
|
|
-class MockEventDispatcher(object):
|
|
|
- sent = []
|
|
|
- closed = False
|
|
|
- flushed = False
|
|
|
- _outbound_buffer = []
|
|
|
-
|
|
|
- def send(self, event, *args, **kwargs):
|
|
|
- self.sent.append(event)
|
|
|
-
|
|
|
- def close(self):
|
|
|
- self.closed = True
|
|
|
-
|
|
|
- def flush(self):
|
|
|
- self.flushed = True
|
|
|
-
|
|
|
-
|
|
|
-class MockHeart(object):
|
|
|
- closed = False
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- self.closed = True
|
|
|
-
|
|
|
-
|
|
|
def create_message(channel, **data):
|
|
|
data.setdefault('id', uuid())
|
|
|
- channel.no_ack_consumers = set()
|
|
|
m = Message(channel, body=pickle.dumps(dict(**data)),
|
|
|
content_type='application/x-python-serialize',
|
|
|
content_encoding='binary',
|
|
@@ -147,133 +90,148 @@ class test_Consumer(AppCase):
|
|
|
def teardown(self):
|
|
|
self.timer.stop()
|
|
|
|
|
|
+ def LoopConsumer(self, buffer=None, controller=None, timer=None, app=None,
|
|
|
+ without_mingle=True, without_gossip=True,
|
|
|
+ without_heartbeat=True, **kwargs):
|
|
|
+ if controller is None:
|
|
|
+ controller = Mock(name='.controller')
|
|
|
+ buffer = buffer if buffer is not None else self.buffer.put
|
|
|
+ timer = timer if timer is not None else self.timer
|
|
|
+ app = app if app is not None else self.app
|
|
|
+ c = Consumer(
|
|
|
+ buffer,
|
|
|
+ timer=timer,
|
|
|
+ app=app,
|
|
|
+ controller=controller,
|
|
|
+ without_mingle=without_mingle,
|
|
|
+ without_gossip=without_gossip,
|
|
|
+ without_heartbeat=without_heartbeat,
|
|
|
+ **kwargs
|
|
|
+ )
|
|
|
+ c.task_consumer = Mock(name='.task_consumer')
|
|
|
+ c.qos = QoS(c.task_consumer.qos, 10)
|
|
|
+ c.connection = Mock(name='.connection')
|
|
|
+ c.controller = c.app.WorkController()
|
|
|
+ c.heart = Mock(name='.heart')
|
|
|
+ c.controller.consumer = c
|
|
|
+ c.pool = c.controller.pool = Mock(name='.controller.pool')
|
|
|
+ c.node = Mock(name='.node')
|
|
|
+ c.event_dispatcher = mock_event_dispatcher()
|
|
|
+ return c
|
|
|
+
|
|
|
+ def NoopConsumer(self, *args, **kwargs):
|
|
|
+ c = self.LoopConsumer(*args, **kwargs)
|
|
|
+ c.loop = Mock(name='.loop')
|
|
|
+ return c
|
|
|
+
|
|
|
def test_info(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.qos = QoS(l.task_consumer.qos, 10)
|
|
|
- l.connection = Mock()
|
|
|
- l.connection.info.return_value = {'foo': 'bar'}
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.controller.pool.info.return_value = [Mock(), Mock()]
|
|
|
- l.controller.consumer = l
|
|
|
- info = l.controller.stats()
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.connection.info.return_value = {'foo': 'bar'}
|
|
|
+ c.controller.pool.info.return_value = [Mock(), Mock()]
|
|
|
+ info = c.controller.stats()
|
|
|
self.assertEqual(info['prefetch_count'], 10)
|
|
|
self.assertTrue(info['broker'])
|
|
|
|
|
|
def test_start_when_closed(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = CLOSE
|
|
|
- l.start()
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.blueprint.state = CLOSE
|
|
|
+ c.start()
|
|
|
|
|
|
def test_connection(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
+ c = self.NoopConsumer()
|
|
|
|
|
|
- l.blueprint.start(l)
|
|
|
- self.assertIsInstance(l.connection, Connection)
|
|
|
+ c.blueprint.start(c)
|
|
|
+ self.assertIsInstance(c.connection, Connection)
|
|
|
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.event_dispatcher = None
|
|
|
- l.blueprint.restart(l)
|
|
|
- self.assertTrue(l.connection)
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.event_dispatcher = None
|
|
|
+ c.blueprint.restart(c)
|
|
|
+ self.assertTrue(c.connection)
|
|
|
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.shutdown()
|
|
|
- self.assertIsNone(l.connection)
|
|
|
- self.assertIsNone(l.task_consumer)
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.shutdown()
|
|
|
+ self.assertIsNone(c.connection)
|
|
|
+ self.assertIsNone(c.task_consumer)
|
|
|
|
|
|
- l.blueprint.start(l)
|
|
|
- self.assertIsInstance(l.connection, Connection)
|
|
|
- l.blueprint.restart(l)
|
|
|
+ c.blueprint.start(c)
|
|
|
+ self.assertIsInstance(c.connection, Connection)
|
|
|
+ c.blueprint.restart(c)
|
|
|
|
|
|
- l.stop()
|
|
|
- l.shutdown()
|
|
|
- self.assertIsNone(l.connection)
|
|
|
- self.assertIsNone(l.task_consumer)
|
|
|
+ c.stop()
|
|
|
+ c.shutdown()
|
|
|
+ self.assertIsNone(c.connection)
|
|
|
+ self.assertIsNone(c.task_consumer)
|
|
|
|
|
|
def test_close_connection(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- step = find_step(l, consumer.Connection)
|
|
|
- conn = l.connection = Mock()
|
|
|
- step.shutdown(l)
|
|
|
- conn.close.assert_called()
|
|
|
- self.assertIsNone(l.connection)
|
|
|
-
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- eventer = l.event_dispatcher = mock_event_dispatcher()
|
|
|
- eventer.enabled = True
|
|
|
- heart = l.heart = MockHeart()
|
|
|
- l.blueprint.state = RUN
|
|
|
- Events = find_step(l, consumer.Events)
|
|
|
- Events.shutdown(l)
|
|
|
- Heart = find_step(l, consumer.Heart)
|
|
|
- Heart.shutdown(l)
|
|
|
- self.assertTrue(eventer.close.call_count)
|
|
|
- self.assertTrue(heart.closed)
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ step = find_step(c, consumer.Connection)
|
|
|
+ connection = c.connection
|
|
|
+ step.shutdown(c)
|
|
|
+ connection.close.assert_called()
|
|
|
+ self.assertIsNone(c.connection)
|
|
|
+
|
|
|
+ def test_close_connection__heart_shutdown(self):
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ event_dispatcher = c.event_dispatcher
|
|
|
+ heart = c.heart
|
|
|
+ c.event_dispatcher.enabled = True
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ Events = find_step(c, consumer.Events)
|
|
|
+ Events.shutdown(c)
|
|
|
+ Heart = find_step(c, consumer.Heart)
|
|
|
+ Heart.shutdown(c)
|
|
|
+ event_dispatcher.close.assert_called()
|
|
|
+ heart.stop.assert_called_with()
|
|
|
|
|
|
@patch('celery.worker.consumer.consumer.warn')
|
|
|
def test_receive_message_unknown(self, warn):
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.steps.pop()
|
|
|
- channel = Mock()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.steps.pop()
|
|
|
+ channel = Mock(name='.channeol')
|
|
|
m = create_message(channel, unknown={'baz': '!!!'})
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- l.node = MockNode()
|
|
|
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ callback = self._get_on_message(c)
|
|
|
callback(m)
|
|
|
self.assertTrue(warn.call_count)
|
|
|
|
|
|
@patch('celery.worker.strategy.to_timestamp')
|
|
|
def test_receive_message_eta_OverflowError(self, to_timestamp):
|
|
|
to_timestamp.side_effect = OverflowError()
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.steps.pop()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.steps.pop()
|
|
|
m = create_task_message(
|
|
|
Mock(), self.foo_task.name,
|
|
|
args=('2, 2'), kwargs={},
|
|
|
eta=datetime.now().isoformat(),
|
|
|
)
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- l.node = MockNode()
|
|
|
- l.update_strategies()
|
|
|
- l.qos = Mock()
|
|
|
-
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ c.update_strategies()
|
|
|
+ callback = self._get_on_message(c)
|
|
|
callback(m)
|
|
|
self.assertTrue(m.acknowledged)
|
|
|
|
|
|
@patch('celery.worker.consumer.consumer.error')
|
|
|
def test_receive_message_InvalidTaskError(self, error):
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- l.steps.pop()
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.steps.pop()
|
|
|
m = create_task_message(
|
|
|
Mock(), self.foo_task.name,
|
|
|
args=(1, 2), kwargs='foobarbaz', id=1)
|
|
|
- l.update_strategies()
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- strat = l.strategies[self.foo_task.name] = Mock(name='strategy')
|
|
|
+ c.update_strategies()
|
|
|
+ strat = c.strategies[self.foo_task.name] = Mock(name='strategy')
|
|
|
strat.side_effect = InvalidTaskError()
|
|
|
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ callback = self._get_on_message(c)
|
|
|
callback(m)
|
|
|
error.assert_called()
|
|
|
self.assertIn('Received invalid task message', error.call_args[0][0])
|
|
|
|
|
|
@patch('celery.worker.consumer.consumer.crit')
|
|
|
def test_on_decode_error(self, crit):
|
|
|
- l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
+ c = self.LoopConsumer()
|
|
|
|
|
|
class MockMessage(Mock):
|
|
|
content_type = 'application/x-msgpack'
|
|
@@ -281,35 +239,32 @@ class test_Consumer(AppCase):
|
|
|
body = 'foobarbaz'
|
|
|
|
|
|
message = MockMessage()
|
|
|
- l.on_decode_error(message, KeyError('foo'))
|
|
|
+ c.on_decode_error(message, KeyError('foo'))
|
|
|
self.assertTrue(message.ack.call_count)
|
|
|
self.assertIn("Can't decode message body", crit.call_args[0][0])
|
|
|
|
|
|
- def _get_on_message(self, l):
|
|
|
- if l.qos is None:
|
|
|
- l.qos = Mock()
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.connection = Mock()
|
|
|
- l.connection.drain_events.side_effect = WorkerShutdown()
|
|
|
+ def _get_on_message(self, c):
|
|
|
+ if c.qos is None:
|
|
|
+ c.qos = Mock()
|
|
|
+ c.task_consumer = Mock()
|
|
|
+ c.event_dispatcher = mock_event_dispatcher()
|
|
|
+ c.connection = Mock(name='.connection')
|
|
|
+ c.connection.drain_events.side_effect = WorkerShutdown()
|
|
|
|
|
|
with self.assertRaises(WorkerShutdown):
|
|
|
- l.loop(*l.loop_args())
|
|
|
- self.assertTrue(l.task_consumer.on_message)
|
|
|
- return l.task_consumer.on_message
|
|
|
+ c.loop(*c.loop_args())
|
|
|
+ self.assertTrue(c.task_consumer.on_message)
|
|
|
+ return c.task_consumer.on_message
|
|
|
|
|
|
def test_receieve_message(self):
|
|
|
- l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
m = create_task_message(
|
|
|
Mock(), self.foo_task.name,
|
|
|
args=[2, 4, 8], kwargs={},
|
|
|
)
|
|
|
- l.update_strategies()
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ c.update_strategies()
|
|
|
+ callback = self._get_on_message(c)
|
|
|
callback(m)
|
|
|
|
|
|
in_bucket = self.buffer.get_nowait()
|
|
@@ -319,45 +274,24 @@ class test_Consumer(AppCase):
|
|
|
self.assertTrue(self.timer.empty())
|
|
|
|
|
|
def test_start_channel_error(self):
|
|
|
-
|
|
|
- class MockConsumer(Consumer):
|
|
|
- iterations = 0
|
|
|
-
|
|
|
- def loop(self, *args, **kwargs):
|
|
|
- if not self.iterations:
|
|
|
- self.iterations = 1
|
|
|
- raise KeyError('foo')
|
|
|
- raise SyntaxError('bar')
|
|
|
-
|
|
|
- l = MockConsumer(self.buffer.put, timer=self.timer,
|
|
|
- send_events=False, pool=BasePool(), app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.channel_errors = (KeyError,)
|
|
|
- with self.assertRaises(KeyError):
|
|
|
- l.start()
|
|
|
- l.timer.stop()
|
|
|
+ c = self.NoopConsumer(send_events=False, pool=BasePool())
|
|
|
+ c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
|
|
|
+ c.channel_errors = (KeyError,)
|
|
|
+ try:
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ c.start()
|
|
|
+ finally:
|
|
|
+ c.timer and c.timer.stop()
|
|
|
|
|
|
def test_start_connection_error(self):
|
|
|
-
|
|
|
- class MockConsumer(Consumer):
|
|
|
- iterations = 0
|
|
|
-
|
|
|
- def loop(self, *args, **kwargs):
|
|
|
- if not self.iterations:
|
|
|
- self.iterations = 1
|
|
|
- raise KeyError('foo')
|
|
|
- raise SyntaxError('bar')
|
|
|
-
|
|
|
- l = MockConsumer(self.buffer.put, timer=self.timer,
|
|
|
- send_events=False, pool=BasePool(), app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
-
|
|
|
- l.connection_errors = (KeyError,)
|
|
|
- with self.assertRaises(SyntaxError):
|
|
|
- l.start()
|
|
|
- l.timer.stop()
|
|
|
+ c = self.NoopConsumer(send_events=False, pool=BasePool())
|
|
|
+ c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
|
|
|
+ c.connection_errors = (KeyError,)
|
|
|
+ try:
|
|
|
+ with self.assertRaises(SyntaxError):
|
|
|
+ c.start()
|
|
|
+ finally:
|
|
|
+ c.timer and c.timer.stop()
|
|
|
|
|
|
def test_loop_ignores_socket_timeout(self):
|
|
|
|
|
@@ -368,12 +302,11 @@ class test_Consumer(AppCase):
|
|
|
self.obj.connection = None
|
|
|
raise socket.timeout(10)
|
|
|
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.connection = Connection()
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.connection.obj = l
|
|
|
- l.qos = QoS(l.task_consumer.qos, 10)
|
|
|
- l.loop(*l.loop_args())
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.connection = Connection()
|
|
|
+ c.connection.obj = c
|
|
|
+ c.qos = QoS(c.task_consumer.qos, 10)
|
|
|
+ c.loop(*c.loop_args())
|
|
|
|
|
|
def test_loop_when_socket_error(self):
|
|
|
|
|
@@ -384,18 +317,17 @@ class test_Consumer(AppCase):
|
|
|
self.obj.connection = None
|
|
|
raise socket.error('foo')
|
|
|
|
|
|
- l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- c = l.connection = Connection()
|
|
|
- l.connection.obj = l
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.qos = QoS(l.task_consumer.qos, 10)
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ conn = c.connection = Connection()
|
|
|
+ c.connection.obj = c
|
|
|
+ c.qos = QoS(c.task_consumer.qos, 10)
|
|
|
with self.assertRaises(socket.error):
|
|
|
- l.loop(*l.loop_args())
|
|
|
+ c.loop(*c.loop_args())
|
|
|
|
|
|
- l.blueprint.state = CLOSE
|
|
|
- l.connection = c
|
|
|
- l.loop(*l.loop_args())
|
|
|
+ c.blueprint.state = CLOSE
|
|
|
+ c.connection = conn
|
|
|
+ c.loop(*c.loop_args())
|
|
|
|
|
|
def test_loop(self):
|
|
|
|
|
@@ -405,70 +337,61 @@ class test_Consumer(AppCase):
|
|
|
def drain_events(self, **kwargs):
|
|
|
self.obj.connection = None
|
|
|
|
|
|
- l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.connection = Connection()
|
|
|
- l.connection.obj = l
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.qos = QoS(l.task_consumer.qos, 10)
|
|
|
-
|
|
|
- l.loop(*l.loop_args())
|
|
|
- l.loop(*l.loop_args())
|
|
|
- self.assertTrue(l.task_consumer.consume.call_count)
|
|
|
- l.task_consumer.qos.assert_called_with(prefetch_count=10)
|
|
|
- self.assertEqual(l.qos.value, 10)
|
|
|
- l.qos.decrement_eventually()
|
|
|
- self.assertEqual(l.qos.value, 9)
|
|
|
- l.qos.update()
|
|
|
- self.assertEqual(l.qos.value, 9)
|
|
|
- l.task_consumer.qos.assert_called_with(prefetch_count=9)
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.connection = Connection()
|
|
|
+ c.connection.obj = c
|
|
|
+ c.qos = QoS(c.task_consumer.qos, 10)
|
|
|
+
|
|
|
+ c.loop(*c.loop_args())
|
|
|
+ c.loop(*c.loop_args())
|
|
|
+ self.assertTrue(c.task_consumer.consume.call_count)
|
|
|
+ c.task_consumer.qos.assert_called_with(prefetch_count=10)
|
|
|
+ self.assertEqual(c.qos.value, 10)
|
|
|
+ c.qos.decrement_eventually()
|
|
|
+ self.assertEqual(c.qos.value, 9)
|
|
|
+ c.qos.update()
|
|
|
+ self.assertEqual(c.qos.value, 9)
|
|
|
+ c.task_consumer.qos.assert_called_with(prefetch_count=9)
|
|
|
|
|
|
def test_ignore_errors(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.connection_errors = (AttributeError, KeyError,)
|
|
|
- l.channel_errors = (SyntaxError,)
|
|
|
- ignore_errors(l, Mock(side_effect=AttributeError('foo')))
|
|
|
- ignore_errors(l, Mock(side_effect=KeyError('foo')))
|
|
|
- ignore_errors(l, Mock(side_effect=SyntaxError('foo')))
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.connection_errors = (AttributeError, KeyError,)
|
|
|
+ c.channel_errors = (SyntaxError,)
|
|
|
+ ignore_errors(c, Mock(side_effect=AttributeError('foo')))
|
|
|
+ ignore_errors(c, Mock(side_effect=KeyError('foo')))
|
|
|
+ ignore_errors(c, Mock(side_effect=SyntaxError('foo')))
|
|
|
with self.assertRaises(IndexError):
|
|
|
- ignore_errors(l, Mock(side_effect=IndexError('foo')))
|
|
|
+ ignore_errors(c, Mock(side_effect=IndexError('foo')))
|
|
|
|
|
|
def test_apply_eta_task(self):
|
|
|
- from celery.worker import state
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.qos = QoS(None, 10)
|
|
|
-
|
|
|
- task = object()
|
|
|
- qos = l.qos.value
|
|
|
- l.apply_eta_task(task)
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.qos = QoS(None, 10)
|
|
|
+ task = Mock(name='task', id='1234213')
|
|
|
+ qos = c.qos.value
|
|
|
+ c.apply_eta_task(task)
|
|
|
self.assertIn(task, state.reserved_requests)
|
|
|
- self.assertEqual(l.qos.value, qos - 1)
|
|
|
+ self.assertEqual(c.qos.value, qos - 1)
|
|
|
self.assertIs(self.buffer.get_nowait(), task)
|
|
|
|
|
|
def test_receieve_message_eta_isoformat(self):
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.steps.pop()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.steps.pop()
|
|
|
m = create_task_message(
|
|
|
Mock(), self.foo_task.name,
|
|
|
eta=(datetime.now() + timedelta(days=1)).isoformat(),
|
|
|
args=[2, 4, 8], kwargs={},
|
|
|
)
|
|
|
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.qos = QoS(l.task_consumer.qos, 1)
|
|
|
- current_pcount = l.qos.value
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- l.enabled = False
|
|
|
- l.update_strategies()
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ c.qos = QoS(c.task_consumer.qos, 1)
|
|
|
+ current_pcount = c.qos.value
|
|
|
+ c.event_dispatcher.enabled = False
|
|
|
+ c.update_strategies()
|
|
|
+ callback = self._get_on_message(c)
|
|
|
callback(m)
|
|
|
- l.timer.stop()
|
|
|
- l.timer.join(1)
|
|
|
+ c.timer.stop()
|
|
|
+ c.timer.join(1)
|
|
|
|
|
|
items = [entry[2] for entry in self.timer.queue]
|
|
|
found = 0
|
|
@@ -476,12 +399,12 @@ class test_Consumer(AppCase):
|
|
|
if item.args[0].name == self.foo_task.name:
|
|
|
found = True
|
|
|
self.assertTrue(found)
|
|
|
- self.assertGreater(l.qos.value, current_pcount)
|
|
|
- l.timer.stop()
|
|
|
+ self.assertGreater(c.qos.value, current_pcount)
|
|
|
+ c.timer.stop()
|
|
|
|
|
|
def test_pidbox_callback(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- con = find_step(l, consumer.Control).box
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ con = find_step(c, consumer.Control).box
|
|
|
con.node = Mock()
|
|
|
con.reset = Mock()
|
|
|
|
|
@@ -500,33 +423,32 @@ class test_Consumer(AppCase):
|
|
|
con.reset.assert_called()
|
|
|
|
|
|
def test_revoke(self):
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.steps.pop()
|
|
|
- channel = Mock()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.steps.pop()
|
|
|
+ channel = Mock(name='channel')
|
|
|
id = uuid()
|
|
|
t = create_task_message(
|
|
|
channel, self.foo_task.name,
|
|
|
args=[2, 4, 8], kwargs={}, id=id,
|
|
|
)
|
|
|
- from celery.worker.state import revoked
|
|
|
- revoked.add(id)
|
|
|
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ state.revoked.add(id)
|
|
|
+
|
|
|
+ callback = self._get_on_message(c)
|
|
|
callback(t)
|
|
|
self.assertTrue(self.buffer.empty())
|
|
|
|
|
|
def test_receieve_message_not_registered(self):
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.blueprint.state = RUN
|
|
|
- l.steps.pop()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ c.steps.pop()
|
|
|
channel = Mock(name='channel')
|
|
|
m = create_task_message(
|
|
|
channel, 'x.X.31x', args=[2, 4, 8], kwargs={},
|
|
|
)
|
|
|
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ callback = self._get_on_message(c)
|
|
|
self.assertFalse(callback(m))
|
|
|
with self.assertRaises(Empty):
|
|
|
self.buffer.get_nowait()
|
|
@@ -535,33 +457,28 @@ class test_Consumer(AppCase):
|
|
|
@patch('celery.worker.consumer.consumer.warn')
|
|
|
@patch('celery.worker.consumer.consumer.logger')
|
|
|
def test_receieve_message_ack_raises(self, logger, warn):
|
|
|
- l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.blueprint.state = RUN
|
|
|
- channel = Mock()
|
|
|
+ c = self.LoopConsumer()
|
|
|
+ c.blueprint.state = RUN
|
|
|
+ channel = Mock(name='channel')
|
|
|
m = create_task_message(
|
|
|
channel, self.foo_task.name,
|
|
|
args=[2, 4, 8], kwargs={},
|
|
|
)
|
|
|
m.headers = None
|
|
|
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
- l.update_strategies()
|
|
|
- l.connection_errors = (socket.error,)
|
|
|
+ c.update_strategies()
|
|
|
+ c.connection_errors = (socket.error,)
|
|
|
m.reject = Mock()
|
|
|
m.reject.side_effect = socket.error('foo')
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ callback = self._get_on_message(c)
|
|
|
self.assertFalse(callback(m))
|
|
|
self.assertTrue(warn.call_count)
|
|
|
with self.assertRaises(Empty):
|
|
|
self.buffer.get_nowait()
|
|
|
self.assertTrue(self.timer.empty())
|
|
|
- m.reject_log_error.assert_called_with(logger, l.connection_errors)
|
|
|
+ m.reject_log_error.assert_called_with(logger, c.connection_errors)
|
|
|
|
|
|
def test_receive_message_eta(self):
|
|
|
- import sys
|
|
|
- from functools import partial
|
|
|
if os.environ.get('C_DEBUG_TEST'):
|
|
|
pp = partial(print, file=sys.__stderr__)
|
|
|
else:
|
|
@@ -569,12 +486,9 @@ class test_Consumer(AppCase):
|
|
|
pass
|
|
|
pp('TEST RECEIVE MESSAGE ETA')
|
|
|
pp('+CREATE MYKOMBUCONSUMER')
|
|
|
- l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
+ c = self.LoopConsumer()
|
|
|
pp('-CREATE MYKOMBUCONSUMER')
|
|
|
- l.steps.pop()
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
+ c.steps.pop()
|
|
|
channel = Mock(name='channel')
|
|
|
pp('+ CREATE MESSAGE')
|
|
|
m = create_task_message(
|
|
@@ -586,36 +500,35 @@ class test_Consumer(AppCase):
|
|
|
|
|
|
try:
|
|
|
pp('+ BLUEPRINT START 1')
|
|
|
- l.blueprint.start(l)
|
|
|
+ c.blueprint.start(c)
|
|
|
pp('- BLUEPRINT START 1')
|
|
|
- p = l.app.conf.broker_connection_retry
|
|
|
- l.app.conf.broker_connection_retry = False
|
|
|
+ p = c.app.conf.broker_connection_retry
|
|
|
+ c.app.conf.broker_connection_retry = False
|
|
|
pp('+ BLUEPRINT START 2')
|
|
|
- l.blueprint.start(l)
|
|
|
+ c.blueprint.start(c)
|
|
|
pp('- BLUEPRINT START 2')
|
|
|
- l.app.conf.broker_connection_retry = p
|
|
|
+ c.app.conf.broker_connection_retry = p
|
|
|
pp('+ BLUEPRINT RESTART')
|
|
|
- l.blueprint.restart(l)
|
|
|
+ c.blueprint.restart(c)
|
|
|
pp('- BLUEPRINT RESTART')
|
|
|
- l.event_dispatcher = mock_event_dispatcher()
|
|
|
pp('+ GET ON MESSAGE')
|
|
|
- callback = self._get_on_message(l)
|
|
|
+ callback = self._get_on_message(c)
|
|
|
pp('- GET ON MESSAGE')
|
|
|
pp('+ CALLBACK')
|
|
|
callback(m)
|
|
|
pp('- CALLBACK')
|
|
|
finally:
|
|
|
pp('+ STOP TIMER')
|
|
|
- l.timer.stop()
|
|
|
+ c.timer.stop()
|
|
|
pp('- STOP TIMER')
|
|
|
try:
|
|
|
pp('+ JOIN TIMER')
|
|
|
- l.timer.join()
|
|
|
+ c.timer.join()
|
|
|
pp('- JOIN TIMER')
|
|
|
except RuntimeError:
|
|
|
pass
|
|
|
|
|
|
- in_hold = l.timer.queue[0]
|
|
|
+ in_hold = c.timer.queue[0]
|
|
|
self.assertEqual(len(in_hold), 3)
|
|
|
eta, priority, entry = in_hold
|
|
|
task = entry.args[0]
|
|
@@ -626,36 +539,27 @@ class test_Consumer(AppCase):
|
|
|
self.buffer.get_nowait()
|
|
|
|
|
|
def test_reset_pidbox_node(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- con = find_step(l, consumer.Control).box
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ con = find_step(c, consumer.Control).box
|
|
|
con.node = Mock()
|
|
|
chan = con.node.channel = Mock()
|
|
|
- l.connection = Mock()
|
|
|
chan.close.side_effect = socket.error('foo')
|
|
|
- l.connection_errors = (socket.error,)
|
|
|
+ c.connection_errors = (socket.error,)
|
|
|
con.reset()
|
|
|
chan.close.assert_called_with()
|
|
|
|
|
|
def test_reset_pidbox_node_green(self):
|
|
|
- from celery.worker.pidbox import gPidbox
|
|
|
- pool = Mock()
|
|
|
- pool.is_green = True
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, pool=pool,
|
|
|
- app=self.app)
|
|
|
- con = find_step(l, consumer.Control)
|
|
|
+ c = self.NoopConsumer(pool=Mock(is_green=True))
|
|
|
+ con = find_step(c, consumer.Control)
|
|
|
self.assertIsInstance(con.box, gPidbox)
|
|
|
- con.start(l)
|
|
|
- l.pool.spawn_n.assert_called_with(
|
|
|
- con.box.loop, l,
|
|
|
- )
|
|
|
+ con.start(c)
|
|
|
+ c.pool.spawn_n.assert_called_with(con.box.loop, c)
|
|
|
|
|
|
- def test__green_pidbox_node(self):
|
|
|
+ def test_green_pidbox_node(self):
|
|
|
pool = Mock()
|
|
|
pool.is_green = True
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, pool=pool,
|
|
|
- app=self.app)
|
|
|
- l.node = Mock()
|
|
|
- controller = find_step(l, consumer.Control)
|
|
|
+ c = self.NoopConsumer(pool=Mock(is_green=True))
|
|
|
+ controller = find_step(c, consumer.Control)
|
|
|
|
|
|
class BConsumer(Mock):
|
|
|
|
|
@@ -700,40 +604,33 @@ class test_Consumer(AppCase):
|
|
|
def close(self):
|
|
|
self.closed = True
|
|
|
|
|
|
- l.connection = Mock()
|
|
|
- l.connect = lambda: Connection(obj=l)
|
|
|
- controller = find_step(l, consumer.Control)
|
|
|
- controller.box.loop(l)
|
|
|
+ c.connect = lambda: Connection(obj=c)
|
|
|
+ controller = find_step(c, consumer.Control)
|
|
|
+ controller.box.loop(c)
|
|
|
|
|
|
controller.box.node.listen.assert_called()
|
|
|
self.assertTrue(controller.box.consumer)
|
|
|
controller.box.consumer.consume.assert_called_with()
|
|
|
|
|
|
- self.assertIsNone(l.connection)
|
|
|
+ self.assertIsNone(c.connection)
|
|
|
self.assertTrue(connections[0].closed)
|
|
|
|
|
|
@patch('kombu.connection.Connection._establish_connection')
|
|
|
@patch('kombu.utils.sleep')
|
|
|
def test_connect_errback(self, sleep, connect):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- from kombu.transport.memory import Transport
|
|
|
+ c = self.NoopConsumer()
|
|
|
Transport.connection_errors = (ChannelError,)
|
|
|
-
|
|
|
- def effect():
|
|
|
- if connect.call_count > 1:
|
|
|
- return
|
|
|
- raise ChannelError('error')
|
|
|
- connect.side_effect = effect
|
|
|
- l.connect()
|
|
|
+ connect.on_nth_call_do(ChannelError('error'), n=1)
|
|
|
+ c.connect()
|
|
|
connect.assert_called_with()
|
|
|
|
|
|
def test_stop_pidbox_node(self):
|
|
|
- l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- cont = find_step(l, consumer.Control)
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ cont = find_step(c, consumer.Control)
|
|
|
cont._node_stopped = Event()
|
|
|
cont._node_shutdown = Event()
|
|
|
cont._node_stopped.set()
|
|
|
- cont.stop(l)
|
|
|
+ cont.stop(c)
|
|
|
|
|
|
def test_start__loop(self):
|
|
|
|
|
@@ -744,67 +641,47 @@ class test_Consumer(AppCase):
|
|
|
def update(self):
|
|
|
self.prev = self.value
|
|
|
|
|
|
- class _Consumer(MyKombuConsumer):
|
|
|
- iterations = 0
|
|
|
-
|
|
|
- def reset_connection(self):
|
|
|
- if self.iterations >= 1:
|
|
|
- raise KeyError('foo')
|
|
|
-
|
|
|
- init_callback = Mock()
|
|
|
- l = _Consumer(self.buffer.put, timer=self.timer,
|
|
|
- init_callback=init_callback, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.broadcast_consumer = Mock()
|
|
|
- l.qos = _QoS()
|
|
|
- l.connection = Connection()
|
|
|
- l.iterations = 0
|
|
|
+ init_callback = Mock(name='init_callback')
|
|
|
+ c = self.NoopConsumer(init_callback=init_callback)
|
|
|
+ c.qos = _QoS()
|
|
|
+ c.connection = Connection()
|
|
|
+ c.iterations = 0
|
|
|
|
|
|
def raises_KeyError(*args, **kwargs):
|
|
|
- l.iterations += 1
|
|
|
- if l.qos.prev != l.qos.value:
|
|
|
- l.qos.update()
|
|
|
- if l.iterations >= 2:
|
|
|
+ c.iterations += 1
|
|
|
+ if c.qos.prev != c.qos.value:
|
|
|
+ c.qos.update()
|
|
|
+ if c.iterations >= 2:
|
|
|
raise KeyError('foo')
|
|
|
|
|
|
- l.loop = raises_KeyError
|
|
|
+ c.loop = raises_KeyError
|
|
|
with self.assertRaises(KeyError):
|
|
|
- l.start()
|
|
|
- self.assertEqual(l.iterations, 2)
|
|
|
- self.assertEqual(l.qos.prev, l.qos.value)
|
|
|
+ c.start()
|
|
|
+ self.assertEqual(c.iterations, 2)
|
|
|
+ self.assertEqual(c.qos.prev, c.qos.value)
|
|
|
|
|
|
init_callback.reset_mock()
|
|
|
- l = _Consumer(self.buffer.put, timer=self.timer, app=self.app,
|
|
|
- send_events=False, init_callback=init_callback)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.qos = _QoS()
|
|
|
- l.task_consumer = Mock()
|
|
|
- l.broadcast_consumer = Mock()
|
|
|
- l.connection = Connection()
|
|
|
- l.loop = Mock(side_effect=socket.error('foo'))
|
|
|
+ c = self.NoopConsumer(send_events=False, init_callback=init_callback)
|
|
|
+ c.qos = _QoS()
|
|
|
+ c.connection = Connection()
|
|
|
+ c.loop = Mock(side_effect=socket.error('foo'))
|
|
|
with self.assertRaises(socket.error):
|
|
|
- l.start()
|
|
|
- self.assertTrue(l.loop.call_count)
|
|
|
+ c.start()
|
|
|
+ c.loop.assert_called()
|
|
|
|
|
|
def test_reset_connection_with_no_node(self):
|
|
|
- l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
|
|
|
- l.controller = l.app.WorkController()
|
|
|
- l.pool = l.controller.pool = Mock()
|
|
|
- l.steps.pop()
|
|
|
- l.blueprint.start(l)
|
|
|
+ c = self.NoopConsumer()
|
|
|
+ c.steps.pop()
|
|
|
+ c.blueprint.start(c)
|
|
|
|
|
|
|
|
|
class test_WorkController(AppCase):
|
|
|
|
|
|
def setup(self):
|
|
|
self.worker = self.create_worker()
|
|
|
- from celery import worker
|
|
|
- self._logger = worker.logger
|
|
|
+ self._logger = worker_module.logger
|
|
|
self._comp_logger = components.logger
|
|
|
- self.logger = worker.logger = Mock()
|
|
|
+ self.logger = worker_module.logger = Mock()
|
|
|
self.comp_logger = components.logger = Mock()
|
|
|
|
|
|
@self.app.task(shared=False)
|
|
@@ -813,8 +690,7 @@ class test_WorkController(AppCase):
|
|
|
self.foo_task = foo_task
|
|
|
|
|
|
def teardown(self):
|
|
|
- from celery import worker
|
|
|
- worker.logger = self._logger
|
|
|
+ worker_module.logger = self._logger
|
|
|
components.logger = self._comp_logger
|
|
|
|
|
|
def create_worker(self, **kw):
|
|
@@ -854,7 +730,6 @@ class test_WorkController(AppCase):
|
|
|
from celery.concurrency.prefork import process_destructor
|
|
|
from celery.concurrency.asynpool import Worker
|
|
|
with patch('celery.signals.worker_process_shutdown') as ws:
|
|
|
- Worker._make_shortcuts = Mock()
|
|
|
with patch('os._exit') as _exit:
|
|
|
worker = Worker(None, None, on_exit=process_destructor)
|
|
|
worker._do_exit(22, 3.1415926)
|