Browse Source

Renamed bucket_queue -> ready_queue

Ask Solem 15 years ago
parent
commit
d1e0743a1b

+ 10 - 10
celery/tests/test_worker.py

@@ -84,13 +84,13 @@ def create_message(backend, **data):
 class TestCarrotListener(unittest.TestCase):
 
     def setUp(self):
-        self.bucket_queue = Queue()
-        self.eta_scheduler = Scheduler(self.bucket_queue)
+        self.ready_queue = Queue()
+        self.eta_scheduler = Scheduler(self.ready_queue)
         self.logger = get_logger()
         self.logger.setLevel(0)
 
     def test_connection(self):
-        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
 
         c = l.reset_connection()
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
@@ -107,29 +107,29 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(l.task_consumer is None)
 
     def test_receieve_message(self):
-        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={})
 
         l.receive_message(m.decode(), m)
 
-        in_bucket = self.bucket_queue.get_nowait()
+        in_bucket = self.ready_queue.get_nowait()
         self.assertTrue(isinstance(in_bucket, TaskWrapper))
         self.assertEquals(in_bucket.task_name, "c.u.foo")
         self.assertEquals(in_bucket.execute(), 2 * 4 * 8)
         self.assertTrue(self.eta_scheduler.empty())
 
     def test_receieve_message_not_registered(self):
-        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
 
         self.assertFalse(l.receive_message(m.decode(), m))
-        self.assertRaises(Empty, self.bucket_queue.get_nowait)
+        self.assertRaises(Empty, self.ready_queue.get_nowait)
         self.assertTrue(self.eta_scheduler.empty())
 
     def test_receieve_message_eta(self):
-        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={},
                            eta=datetime.now() + timedelta(days=1))
@@ -143,7 +143,7 @@ class TestCarrotListener(unittest.TestCase):
         self.assertTrue(callable(on_accept))
         self.assertEquals(task.task_name, "c.u.foo")
         self.assertEquals(task.execute(), 2 * 4 * 8)
-        self.assertRaises(Empty, self.bucket_queue.get_nowait)
+        self.assertRaises(Empty, self.ready_queue.get_nowait)
 
 
 class TestWorkController(unittest.TestCase):
@@ -155,7 +155,7 @@ class TestWorkController(unittest.TestCase):
 
     def test_attrs(self):
         worker = self.worker
-        self.assertTrue(isinstance(worker.bucket_queue, Queue))
+        self.assertTrue(isinstance(worker.ready_queue, Queue))
         self.assertTrue(isinstance(worker.eta_scheduler, Scheduler))
         self.assertTrue(worker.schedule_controller)
         self.assertTrue(worker.pool)

+ 5 - 5
celery/tests/test_worker_controllers.py

@@ -48,8 +48,8 @@ class TestBackgroundThread(unittest.TestCase):
 class TestMediator(unittest.TestCase):
 
     def test_mediator_start__stop(self):
-        bucket_queue = Queue()
-        m = Mediator(bucket_queue, lambda t: t)
+        ready_queue = Queue()
+        m = Mediator(ready_queue, lambda t: t)
         m.start()
         self.assertFalse(m._shutdown.isSet())
         self.assertFalse(m._stopped.isSet())
@@ -59,14 +59,14 @@ class TestMediator(unittest.TestCase):
         self.assertTrue(m._stopped.isSet())
 
     def test_mediator_on_iteration(self):
-        bucket_queue = Queue()
+        ready_queue = Queue()
         got = {}
 
         def mycallback(value):
             got["value"] = value.value
 
-        m = Mediator(bucket_queue, mycallback)
-        bucket_queue.put(MockTask("George Constanza"))
+        m = Mediator(ready_queue, mycallback)
+        ready_queue.put(MockTask("George Constanza"))
 
         m.on_iteration()
 

+ 10 - 10
celery/worker/__init__.py

@@ -26,10 +26,10 @@ class CarrotListener(object):
     """Listen for messages received from the AMQP broker and
     move them the the bucket queue for task processing.
 
-    :param bucket_queue: See :attr:`bucket_queue`.
+    :param ready_queue: See :attr:`ready_queue`.
     :param eta_scheduler: See :attr:`eta_scheduler`.
 
-    .. attribute:: bucket_queue
+    .. attribute:: ready_queue
 
         The queue that holds tasks ready for processing immediately.
 
@@ -44,11 +44,11 @@ class CarrotListener(object):
 
     """
 
-    def __init__(self, bucket_queue, eta_scheduler, logger,
+    def __init__(self, ready_queue, eta_scheduler, logger,
             initial_prefetch_count=2):
         self.amqp_connection = None
         self.task_consumer = None
-        self.bucket_queue = bucket_queue
+        self.ready_queue = ready_queue
         self.eta_scheduler = eta_scheduler
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
@@ -112,7 +112,7 @@ class CarrotListener(object):
         else:
             self.logger.info("Got task from broker: %s[%s]" % (
                     task.task_name, task.task_id))
-            self.bucket_queue.put(task)
+            self.ready_queue.put(task)
 
     def close_connection(self):
         """Close the AMQP connection."""
@@ -201,7 +201,7 @@ class WorkController(object):
 
         The :class:`multiprocessing.Pool` instance used.
 
-    .. attribute:: bucket_queue
+    .. attribute:: ready_queue
 
         The :class:`Queue.Queue` that holds tasks ready for immediate
         processing.
@@ -242,20 +242,20 @@ class WorkController(object):
         self.embed_clockservice = embed_clockservice
 
         # Queues
-        self.bucket_queue = Queue()
+        self.ready_queue = Queue()
         self.hold_queue = Queue()
-        self.eta_scheduler = Scheduler(self.bucket_queue)
+        self.eta_scheduler = Scheduler(self.ready_queue)
 
         self.logger.debug("Instantiating thread components...")
 
         # Threads+Pool
         self.schedule_controller = ScheduleController(self.eta_scheduler)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.broker_listener = CarrotListener(self.bucket_queue,
+        self.broker_listener = CarrotListener(self.ready_queue,
                                         self.eta_scheduler,
                                         logger=self.logger,
                                         initial_prefetch_count=concurrency)
-        self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
+        self.mediator = Mediator(self.ready_queue, self.safe_process_task)
 
         self.clockservice = None
         if self.embed_clockservice:

+ 6 - 6
celery/worker/controllers.py

@@ -65,29 +65,29 @@ class BackgroundThread(threading.Thread):
 class Mediator(BackgroundThread):
     """Thread continuously sending tasks in the queue to the pool.
 
-    .. attribute:: bucket_queue
+    .. attribute:: ready_queue
 
         The task queue, a :class:`Queue.Queue` instance.
 
     .. attribute:: callback
 
         The callback used to process tasks retrieved from the
-        :attr:`bucket_queue`.
+        :attr:`ready_queue`.
 
     """
 
-    def __init__(self, bucket_queue, callback):
+    def __init__(self, ready_queue, callback):
         super(Mediator, self).__init__()
-        self.bucket_queue = bucket_queue
+        self.ready_queue = ready_queue
         self.callback = callback
 
     def on_iteration(self):
         """Get tasks from bucket queue and apply the task callback."""
         logger = get_default_logger()
         try:
-            logger.debug("Mediator: Trying to get message from bucket_queue")
+            logger.debug("Mediator: Trying to get message from ready_queue")
             # This blocks until there's a message in the queue.
-            task = self.bucket_queue.get(timeout=1)
+            task = self.ready_queue.get(timeout=1)
         except QueueEmpty:
             logger.debug("Mediator: Bucket queue is empty.")
         else:

+ 4 - 4
celery/worker/scheduler.py

@@ -4,8 +4,8 @@ import time
 
 class Scheduler(object):
 
-    def __init__(self, bucket_queue):
-        self.bucket_queue = bucket_queue
+    def __init__(self, ready_queue):
+        self.ready_queue = ready_queue
         self._queue = []
 
     def enter(self, item, eta=None, priority=0, callback=None):
@@ -19,7 +19,7 @@ class Scheduler(object):
         q = self._queue
         nowfun = time.time
         pop = heapq.heappop
-        bucket = self.bucket_queue
+        ready_queue = self.ready_queue
 
         while True:
             if q:
@@ -34,7 +34,7 @@ class Scheduler(object):
                         eta, priority, item))
 
                     if event is verify:
-                        bucket.put(item)
+                        ready_queue.put(item)
                         callback and callback()
                         yield 0
                     else: