|
@@ -7,21 +7,43 @@ from carrot.connection import BrokerConnection
|
|
|
from carrot.backends.base import BaseMessage
|
|
|
from billiard.serialization import pickle
|
|
|
|
|
|
-from celery.utils import gen_unique_id
|
|
|
-from celery.worker import CarrotListener, WorkController
|
|
|
+from celery import conf
|
|
|
+from celery.utils import gen_unique_id, noop
|
|
|
+from celery.worker import WorkController
|
|
|
+from celery.worker.listener import CarrotListener, RUN, CLOSE
|
|
|
from celery.worker.job import TaskWrapper
|
|
|
from celery.worker.scheduler import Scheduler
|
|
|
from celery.decorators import task as task_dec
|
|
|
from celery.decorators import periodic_task as periodic_task_dec
|
|
|
|
|
|
|
|
|
+class PlaceHolder(object):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+class MockControlDispatch(object):
|
|
|
+ commands = []
|
|
|
+
|
|
|
+ def dispatch_from_message(self, message):
|
|
|
+ self.commands.append(message.pop("command", None))
|
|
|
+
|
|
|
+
|
|
|
class MockEventDispatcher(object):
|
|
|
+ sent = []
|
|
|
+ closed = False
|
|
|
|
|
|
- def send(self, *args, **kwargs):
|
|
|
- pass
|
|
|
+ def send(self, event, *args, **kwargs):
|
|
|
+ self.sent.append(event)
|
|
|
|
|
|
def close(self):
|
|
|
- pass
|
|
|
+ self.closed = True
|
|
|
+
|
|
|
+
|
|
|
+class MockHeart(object):
|
|
|
+ closed = False
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ self.closed = True
|
|
|
|
|
|
|
|
|
@task_dec()
|
|
@@ -105,6 +127,47 @@ class TestCarrotListener(unittest.TestCase):
|
|
|
self.logger = get_logger()
|
|
|
self.logger.setLevel(0)
|
|
|
|
|
|
+ def test_mainloop(self):
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+
|
|
|
+ class MockConnection(object):
|
|
|
+
|
|
|
+ def drain_events(self):
|
|
|
+ return "draining"
|
|
|
+
|
|
|
+ l.connection = PlaceHolder()
|
|
|
+ l.connection.connection = MockConnection()
|
|
|
+
|
|
|
+ it = l._mainloop()
|
|
|
+ self.assertTrue(it.next(), "draining")
|
|
|
+
|
|
|
+ records = {}
|
|
|
+ def create_recorder(key):
|
|
|
+ def _recorder(*args, **kwargs):
|
|
|
+ records[key] = True
|
|
|
+ return _recorder
|
|
|
+
|
|
|
+ l.task_consumer = PlaceHolder()
|
|
|
+ l.task_consumer.iterconsume = create_recorder("consume_tasks")
|
|
|
+ l.broadcast_consumer = PlaceHolder()
|
|
|
+ l.broadcast_consumer.register_callback = create_recorder(
|
|
|
+ "broadcast_callback")
|
|
|
+ l.broadcast_consumer.iterconsume = create_recorder(
|
|
|
+ "consume_broadcast")
|
|
|
+ l.task_consumer.add_consumer = create_recorder("consumer_add")
|
|
|
+
|
|
|
+ records.clear()
|
|
|
+ self.assertEquals(l._detect_wait_method(), l._mainloop)
|
|
|
+ self.assertTrue(records.get("broadcast_callback"))
|
|
|
+ self.assertTrue(records.get("consume_broadcast"))
|
|
|
+ self.assertTrue(records.get("consume_tasks"))
|
|
|
+
|
|
|
+ records.clear()
|
|
|
+ l.connection.connection = PlaceHolder()
|
|
|
+ self.assertTrue(l._detect_wait_method() is l.task_consumer.iterconsume)
|
|
|
+ self.assertTrue(records.get("consumer_add"))
|
|
|
+
|
|
|
def test_connection(self):
|
|
|
l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
send_events=False)
|
|
@@ -123,6 +186,44 @@ class TestCarrotListener(unittest.TestCase):
|
|
|
self.assertTrue(l.connection is None)
|
|
|
self.assertTrue(l.task_consumer is None)
|
|
|
|
|
|
+ def test_receive_message_control_command(self):
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ backend = MockBackend()
|
|
|
+ m = create_message(backend, control={"command": "shutdown"})
|
|
|
+ l.event_dispatcher = MockEventDispatcher()
|
|
|
+ l.control_dispatch = MockControlDispatch()
|
|
|
+ l.receive_message(m.decode(), m)
|
|
|
+ self.assertTrue("shutdown" in l.control_dispatch.commands)
|
|
|
+
|
|
|
+ def test_close_connection(self):
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ l._state = RUN
|
|
|
+ l.close_connection()
|
|
|
+
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ eventer = l.event_dispatcher = MockEventDispatcher()
|
|
|
+ heart = l.heart = MockHeart()
|
|
|
+ l._state = RUN
|
|
|
+ l.close_connection()
|
|
|
+ self.assertTrue(eventer.closed)
|
|
|
+ self.assertTrue(heart.closed)
|
|
|
+
|
|
|
+ def test_receive_message_unknown(self):
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
+ send_events=False)
|
|
|
+ backend = MockBackend()
|
|
|
+ m = create_message(backend, unknown={"baz": "!!!"})
|
|
|
+ l.event_dispatcher = MockEventDispatcher()
|
|
|
+ l.control_dispatch = MockControlDispatch()
|
|
|
+ import warnings
|
|
|
+ with warnings.catch_warnings(record=True) as log:
|
|
|
+ l.receive_message(m.decode(), m)
|
|
|
+ self.assertTrue(log)
|
|
|
+ self.assertTrue("unknown message" in log[0].message.args[0])
|
|
|
+
|
|
|
def test_receieve_message(self):
|
|
|
l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
|
|
|
send_events=False)
|
|
@@ -196,6 +297,11 @@ class TestCarrotListener(unittest.TestCase):
|
|
|
timedelta(days=1)).isoformat())
|
|
|
|
|
|
l.reset_connection()
|
|
|
+ p, conf.BROKER_CONNECTION_RETRY = conf.BROKER_CONNECTION_RETRY, False
|
|
|
+ try:
|
|
|
+ l.reset_connection()
|
|
|
+ finally:
|
|
|
+ conf.BROKER_CONNECTION_RETRY = p
|
|
|
l.receive_message(m.decode(), m)
|
|
|
|
|
|
in_hold = self.eta_schedule.queue[0]
|