|
@@ -9,6 +9,7 @@ from carrot.backends.base import BaseMessage
|
|
|
from celery import registry
|
|
|
from celery.serialization import pickle
|
|
|
from celery.utils import gen_unique_id
|
|
|
+from celery.scheduler import Scheduler
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
|
|
@@ -84,12 +85,12 @@ class TestAMQPListener(unittest.TestCase):
|
|
|
|
|
|
def setUp(self):
|
|
|
self.bucket_queue = Queue()
|
|
|
- self.hold_queue = Queue()
|
|
|
+ self.eta_scheduler = Scheduler(self.bucket_queue)
|
|
|
self.logger = get_logger()
|
|
|
self.logger.setLevel(0)
|
|
|
|
|
|
def test_connection(self):
|
|
|
- l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
|
|
|
+ l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
|
|
|
|
|
|
c = l.reset_connection()
|
|
|
self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
|
|
@@ -106,7 +107,7 @@ class TestAMQPListener(unittest.TestCase):
|
|
|
self.assertTrue(l.task_consumer is None)
|
|
|
|
|
|
def test_receieve_message(self):
|
|
|
- l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
|
|
|
+ l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
|
|
|
backend = MockBackend()
|
|
|
m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={})
|
|
|
|
|
@@ -116,30 +117,29 @@ class TestAMQPListener(unittest.TestCase):
|
|
|
self.assertTrue(isinstance(in_bucket, TaskWrapper))
|
|
|
self.assertEquals(in_bucket.task_name, "c.u.foo")
|
|
|
self.assertEquals(in_bucket.execute(), 2 * 4 * 8)
|
|
|
- self.assertRaises(Empty, self.hold_queue.get_nowait)
|
|
|
+ self.assertTrue(self.eta_scheduler.empty())
|
|
|
|
|
|
def test_receieve_message_not_registered(self):
|
|
|
- l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
|
|
|
+ l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
|
|
|
backend = MockBackend()
|
|
|
m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
|
|
|
|
|
|
self.assertFalse(l.receive_message(m.decode(), m))
|
|
|
self.assertRaises(Empty, self.bucket_queue.get_nowait)
|
|
|
- self.assertRaises(Empty, self.hold_queue.get_nowait)
|
|
|
+ self.assertTrue(self.eta_scheduler.empty())
|
|
|
|
|
|
def test_receieve_message_eta(self):
|
|
|
- l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
|
|
|
+ l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
|
|
|
backend = MockBackend()
|
|
|
m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={},
|
|
|
eta=datetime.now() + timedelta(days=1))
|
|
|
|
|
|
l.receive_message(m.decode(), m)
|
|
|
|
|
|
- in_hold = self.hold_queue.get_nowait()
|
|
|
- self.assertEquals(len(in_hold), 3)
|
|
|
- task, eta, on_accept = in_hold
|
|
|
+ in_hold = self.eta_scheduler.queue[0]
|
|
|
+ self.assertEquals(len(in_hold), 4)
|
|
|
+ eta, priority, task, on_accept = in_hold
|
|
|
self.assertTrue(isinstance(task, TaskWrapper))
|
|
|
- self.assertTrue(isinstance(eta, datetime))
|
|
|
self.assertTrue(callable(on_accept))
|
|
|
self.assertEquals(task.task_name, "c.u.foo")
|
|
|
self.assertEquals(task.execute(), 2 * 4 * 8)
|
|
@@ -156,8 +156,8 @@ class TestWorkController(unittest.TestCase):
|
|
|
def test_attrs(self):
|
|
|
worker = self.worker
|
|
|
self.assertTrue(isinstance(worker.bucket_queue, Queue))
|
|
|
- self.assertTrue(isinstance(worker.hold_queue, Queue))
|
|
|
- self.assertTrue(worker.periodic_work_controller)
|
|
|
+ self.assertTrue(isinstance(worker.eta_scheduler, Scheduler))
|
|
|
+ self.assertTrue(worker.schedule_controller)
|
|
|
self.assertTrue(worker.pool)
|
|
|
self.assertTrue(worker.amqp_listener)
|
|
|
self.assertTrue(worker.mediator)
|