|
@@ -21,7 +21,7 @@ from celery.worker import WorkController
|
|
|
from celery.worker.buckets import FastQueue
|
|
|
from celery.worker.job import TaskRequest
|
|
|
from celery.worker.consumer import Consumer as MainConsumer
|
|
|
-from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX
|
|
|
+from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
|
|
|
from celery.utils.serialization import pickle
|
|
|
from celery.utils.timer2 import Timer
|
|
|
|
|
@@ -181,6 +181,21 @@ class test_QoS(unittest.TestCase):
|
|
|
qos.increment()
|
|
|
self.assertEqual(qos.value, 0)
|
|
|
|
|
|
+ def test_consumer_decrement_eventually(self):
|
|
|
+ consumer = Mock()
|
|
|
+ qos = QoS(consumer, 10, current_app.log.get_default_logger())
|
|
|
+ qos.decrement_eventually()
|
|
|
+ self.assertEqual(qos.value, 9)
|
|
|
+ qos.value = 0
|
|
|
+ qos.decrement_eventually()
|
|
|
+ self.assertEqual(qos.value, 0)
|
|
|
+
|
|
|
+ def test_set(self):
|
|
|
+ consumer = Mock()
|
|
|
+ qos = QoS(consumer, 10, current_app.log.get_default_logger())
|
|
|
+ qos.set(12)
|
|
|
+ self.assertEqual(qos.prev, 12)
|
|
|
+ qos.set(qos.prev)
|
|
|
|
|
|
class test_Consumer(unittest.TestCase):
|
|
|
|
|
@@ -205,6 +220,12 @@ class test_Consumer(unittest.TestCase):
|
|
|
info = l.info
|
|
|
self.assertTrue(info["broker"])
|
|
|
|
|
|
+ def test_start_when_closed(self):
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l._state = CLOSE
|
|
|
+ l.start()
|
|
|
+
|
|
|
def test_connection(self):
|
|
|
l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
send_events=False)
|
|
@@ -338,6 +359,46 @@ class test_Consumer(unittest.TestCase):
|
|
|
l.heart.stop()
|
|
|
l.priority_timer.stop()
|
|
|
|
|
|
+ def test_consume_messages_ignores_socket_timeout(self):
|
|
|
+
|
|
|
+ class Connection(current_app.broker_connection().__class__):
|
|
|
+ obj = None
|
|
|
+
|
|
|
+ def drain_events(self, **kwargs):
|
|
|
+ self.obj.connection = None
|
|
|
+ raise socket.timeout(10)
|
|
|
+
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l.connection = Connection()
|
|
|
+ l.task_consumer = Mock()
|
|
|
+ l.connection.obj = l
|
|
|
+ l.qos = QoS(l.task_consumer, 10, l.logger)
|
|
|
+ l.consume_messages()
|
|
|
+
|
|
|
+ def test_consume_messages_when_socket_error(self):
|
|
|
+
|
|
|
+ class Connection(current_app.broker_connection().__class__):
|
|
|
+ obj = None
|
|
|
+
|
|
|
+ def drain_events(self, **kwargs):
|
|
|
+ self.obj.connection = None
|
|
|
+ raise socket.error("foo")
|
|
|
+
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l._state = RUN
|
|
|
+ c = l.connection = Connection()
|
|
|
+ l.connection.obj = l
|
|
|
+ l.task_consumer = Mock()
|
|
|
+ l.qos = QoS(l.task_consumer, 10, l.logger)
|
|
|
+ with self.assertRaises(socket.error):
|
|
|
+ l.consume_messages()
|
|
|
+
|
|
|
+ l._state = CLOSE
|
|
|
+ l.connection = c
|
|
|
+ l.consume_messages()
|
|
|
+
|
|
|
def test_consume_messages(self):
|
|
|
|
|
|
class Connection(current_app.broker_connection().__class__):
|
|
@@ -395,6 +456,7 @@ class test_Consumer(unittest.TestCase):
|
|
|
l.task_consumer = Mock()
|
|
|
l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
|
|
|
l.event_dispatcher = Mock()
|
|
|
+ l.enabled = False
|
|
|
l.receive_message(m.decode(), m)
|
|
|
l.eta_schedule.stop()
|
|
|
|
|
@@ -407,6 +469,26 @@ class test_Consumer(unittest.TestCase):
|
|
|
self.assertTrue(l.task_consumer.qos.call_count)
|
|
|
l.eta_schedule.stop()
|
|
|
|
|
|
+ def test_on_control(self):
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l.pidbox_node = Mock()
|
|
|
+ l.reset_pidbox_node = Mock()
|
|
|
+
|
|
|
+ l.on_control("foo", "bar")
|
|
|
+ l.pidbox_node.handle_message.assert_called_with("foo", "bar")
|
|
|
+
|
|
|
+ l.pidbox_node = Mock()
|
|
|
+ l.pidbox_node.handle_message.side_effect = KeyError("foo")
|
|
|
+ l.on_control("foo", "bar")
|
|
|
+ l.pidbox_node.handle_message.assert_called_with("foo", "bar")
|
|
|
+
|
|
|
+ l.pidbox_node = Mock()
|
|
|
+ l.pidbox_node.handle_message.side_effect = ValueError("foo")
|
|
|
+ l.on_control("foo", "bar")
|
|
|
+ l.pidbox_node.handle_message.assert_called_with("foo", "bar")
|
|
|
+ l.reset_pidbox_node.assert_called_with()
|
|
|
+
|
|
|
def test_revoke(self):
|
|
|
ready_queue = FastQueue()
|
|
|
l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
|
|
@@ -432,6 +514,26 @@ class test_Consumer(unittest.TestCase):
|
|
|
self.assertRaises(Empty, self.ready_queue.get_nowait)
|
|
|
self.assertTrue(self.eta_schedule.empty())
|
|
|
|
|
|
+ def test_receieve_message_ack_raises(self):
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ backend = Mock()
|
|
|
+ m = create_message(backend, args=[2, 4, 8], kwargs={})
|
|
|
+
|
|
|
+ l.event_dispatcher = Mock()
|
|
|
+ l.connection_errors = (socket.error, )
|
|
|
+ l.logger = Mock()
|
|
|
+ m.ack = Mock()
|
|
|
+ m.ack.side_effect = socket.error("foo")
|
|
|
+ with catch_warnings(record=True) as log:
|
|
|
+ self.assertFalse(l.receive_message(m.decode(), m))
|
|
|
+ self.assertTrue(log)
|
|
|
+ self.assertIn("unknown message", log[0].message.args[0])
|
|
|
+ self.assertRaises(Empty, self.ready_queue.get_nowait)
|
|
|
+ self.assertTrue(self.eta_schedule.empty())
|
|
|
+ m.ack.assert_called_with()
|
|
|
+ self.assertTrue(l.logger.critical.call_count)
|
|
|
+
|
|
|
def test_receieve_message_eta(self):
|
|
|
l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
send_events=False)
|
|
@@ -463,6 +565,59 @@ class test_Consumer(unittest.TestCase):
|
|
|
self.assertEqual(task.execute(), 2 * 4 * 8)
|
|
|
self.assertRaises(Empty, self.ready_queue.get_nowait)
|
|
|
|
|
|
+ def test_reset_pidbox_node(self):
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l.pidbox_node = Mock()
|
|
|
+ chan = l.pidbox_node.channel = Mock()
|
|
|
+ l.connection = Mock()
|
|
|
+ chan.close.side_effect = socket.error("foo")
|
|
|
+ l.connection_errors = (socket.error, )
|
|
|
+ l.reset_pidbox_node()
|
|
|
+ chan.close.assert_called_with()
|
|
|
+
|
|
|
+ def test_reset_pidbox_node_green(self):
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l.pool = Mock()
|
|
|
+ l.pool.is_green = True
|
|
|
+ l.reset_pidbox_node()
|
|
|
+ l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
|
|
|
+
|
|
|
+ def test__green_pidbox_node(self):
|
|
|
+ l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l.pidbox_node = Mock()
|
|
|
+
|
|
|
+ connections = []
|
|
|
+
|
|
|
+ class Connection(object):
|
|
|
+
|
|
|
+ def __init__(self, obj):
|
|
|
+ connections.append(self)
|
|
|
+ self.obj = obj
|
|
|
+ self.closed = False
|
|
|
+
|
|
|
+ def channel(self):
|
|
|
+ return Mock()
|
|
|
+
|
|
|
+ def drain_events(self):
|
|
|
+ self.obj.connection = None
|
|
|
+
|
|
|
+ def close(self):
|
|
|
+ self.closed = True
|
|
|
+
|
|
|
+ l.connection = Mock()
|
|
|
+ l._open_connection = lambda: Connection(obj=l)
|
|
|
+ l._green_pidbox_node()
|
|
|
+
|
|
|
+ l.pidbox_node.listen.assert_called_with(callback=l.on_control)
|
|
|
+ self.assertTrue(l.broadcast_consumer)
|
|
|
+ l.broadcast_consumer.consume.assert_called_with()
|
|
|
+
|
|
|
+ self.assertIsNone(l.connection)
|
|
|
+ self.assertTrue(connections[0].closed)
|
|
|
+
|
|
|
def test_start__consume_messages(self):
|
|
|
|
|
|
class _QoS(object):
|