|
@@ -15,6 +15,15 @@ from celery.worker.scheduler import Scheduler
|
|
from celery.decorators import task as task_dec
|
|
from celery.decorators import task as task_dec
|
|
|
|
|
|
|
|
|
|
|
|
+class MockEventDispatcher(object):
|
|
|
|
+
|
|
|
|
+ def send(self, *args, **kwargs):
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+ def close(self):
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+
|
|
@task_dec()
|
|
@task_dec()
|
|
def foo_task(x, y, z, **kwargs):
|
|
def foo_task(x, y, z, **kwargs):
|
|
return x * y * z
|
|
return x * y * z
|
|
@@ -92,7 +101,8 @@ class TestCarrotListener(unittest.TestCase):
|
|
self.logger.setLevel(0)
|
|
self.logger.setLevel(0)
|
|
|
|
|
|
def test_connection(self):
|
|
def test_connection(self):
|
|
- l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
|
|
|
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
|
|
|
|
+ send_events=False)
|
|
|
|
|
|
c = l.reset_connection()
|
|
c = l.reset_connection()
|
|
self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
|
|
self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
|
|
@@ -109,11 +119,13 @@ class TestCarrotListener(unittest.TestCase):
|
|
self.assertTrue(l.task_consumer is None)
|
|
self.assertTrue(l.task_consumer is None)
|
|
|
|
|
|
def test_receieve_message(self):
|
|
def test_receieve_message(self):
|
|
- l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
|
|
|
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
|
|
|
|
+ send_events=False)
|
|
backend = MockBackend()
|
|
backend = MockBackend()
|
|
m = create_message(backend, task=foo_task.name,
|
|
m = create_message(backend, task=foo_task.name,
|
|
args=[2, 4, 8], kwargs={})
|
|
args=[2, 4, 8], kwargs={})
|
|
|
|
|
|
|
|
+ l.event_dispatcher = MockEventDispatcher()
|
|
l.receive_message(m.decode(), m)
|
|
l.receive_message(m.decode(), m)
|
|
|
|
|
|
in_bucket = self.ready_queue.get_nowait()
|
|
in_bucket = self.ready_queue.get_nowait()
|
|
@@ -123,22 +135,26 @@ class TestCarrotListener(unittest.TestCase):
|
|
self.assertTrue(self.eta_scheduler.empty())
|
|
self.assertTrue(self.eta_scheduler.empty())
|
|
|
|
|
|
def test_receieve_message_not_registered(self):
|
|
def test_receieve_message_not_registered(self):
|
|
- l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
|
|
|
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
|
|
|
|
+ send_events=False)
|
|
backend = MockBackend()
|
|
backend = MockBackend()
|
|
m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
|
|
m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
|
|
|
|
|
|
|
|
+ l.event_dispatcher = MockEventDispatcher()
|
|
self.assertFalse(l.receive_message(m.decode(), m))
|
|
self.assertFalse(l.receive_message(m.decode(), m))
|
|
self.assertRaises(Empty, self.ready_queue.get_nowait)
|
|
self.assertRaises(Empty, self.ready_queue.get_nowait)
|
|
self.assertTrue(self.eta_scheduler.empty())
|
|
self.assertTrue(self.eta_scheduler.empty())
|
|
|
|
|
|
def test_receieve_message_eta(self):
|
|
def test_receieve_message_eta(self):
|
|
- l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
|
|
|
|
|
|
+ l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger,
|
|
|
|
+ send_events=False)
|
|
backend = MockBackend()
|
|
backend = MockBackend()
|
|
m = create_message(backend, task=foo_task.name,
|
|
m = create_message(backend, task=foo_task.name,
|
|
args=[2, 4, 8], kwargs={},
|
|
args=[2, 4, 8], kwargs={},
|
|
eta=(datetime.now() +
|
|
eta=(datetime.now() +
|
|
timedelta(days=1)).isoformat())
|
|
timedelta(days=1)).isoformat())
|
|
|
|
|
|
|
|
+ l.reset_connection()
|
|
l.receive_message(m.decode(), m)
|
|
l.receive_message(m.decode(), m)
|
|
|
|
|
|
in_hold = self.eta_scheduler.queue[0]
|
|
in_hold = self.eta_scheduler.queue[0]
|