Browse Source

And another file I forgot to commit :(

Ask Solem 15 years ago
parent
commit
9c91935a64
1 changed files with 17 additions and 14 deletions
  1. 17 14
      celery/worker/__init__.py

+ 17 - 14
celery/worker/__init__.py

@@ -4,7 +4,7 @@ The Multiprocessing Worker Server
 
 """
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
-from celery.worker.controllers import Mediator, PeriodicWorkController
+from celery.worker.controllers import Mediator, ScheduleController
 from celery.beat import ClockServiceThread
 from celery.worker.job import TaskWrapper
 from celery.exceptions import NotRegistered
@@ -15,6 +15,7 @@ from celery.log import setup_logger
 from celery.pool import TaskPool
 from celery.utils import retry_over_time
 from celery.datastructures import SharedCounter
+from celery.scheduler import Scheduler
 from Queue import Queue
 import traceback
 import logging
@@ -26,15 +27,15 @@ class AMQPListener(object):
     move them the the bucket queue for task processing.
 
     :param bucket_queue: See :attr:`bucket_queue`.
-    :param hold_queue: See :attr:`hold_queue`.
+    :param eta_scheduler: See :attr:`eta_scheduler`.
 
     .. attribute:: bucket_queue
 
         The queue that holds tasks ready for processing immediately.
 
-    .. attribute:: hold_queue
+    .. attribute:: eta_scheduler
 
-        The queue that holds paused tasks. Reasons for being paused include
+        Scheduler for paused tasks. Reasons for being paused include
         a countdown/eta or that it's waiting for retry.
 
     .. attribute:: logger
@@ -43,12 +44,12 @@ class AMQPListener(object):
 
     """
 
-    def __init__(self, bucket_queue, hold_queue, logger,
+    def __init__(self, bucket_queue, eta_scheduler, logger,
             initial_prefetch_count=2):
         self.amqp_connection = None
         self.task_consumer = None
         self.bucket_queue = bucket_queue
-        self.hold_queue = hold_queue
+        self.eta_scheduler = eta_scheduler
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
 
@@ -105,7 +106,9 @@ class AMQPListener(object):
             self.prefetch_count.increment()
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
                     task.task_name, task.task_id, eta))
-            self.hold_queue.put((task, eta, self.prefetch_count.decrement))
+            self.eta_scheduler.enter(task,
+                                     eta=eta,
+                                     callback=self.prefetch_count.decrement)
         else:
             self.logger.info("Got task from broker: %s[%s]" % (
                     task.task_name, task.task_id))
@@ -209,9 +212,9 @@ class WorkController(object):
         back the task include waiting for ``eta`` to pass or the task is being
         retried.
 
-    .. attribute:: periodic_work_controller
+    .. attribute:: schedule_controller
 
-        Instance of :class:`celery.worker.controllers.PeriodicWorkController`.
+        Instance of :class:`celery.worker.controllers.ScheduleController`.
 
     .. attribute:: mediator
 
@@ -241,15 +244,15 @@ class WorkController(object):
         # Queues
         self.bucket_queue = Queue()
         self.hold_queue = Queue()
+        self.eta_scheduler = Scheduler(self.bucket_queue)
 
         self.logger.debug("Instantiating thread components...")
 
         # Threads+Pool
-        self.periodic_work_controller = PeriodicWorkController(
-                                                    self.bucket_queue,
-                                                    self.hold_queue)
+        self.schedule_controller = ScheduleController(self.eta_scheduler)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
+        self.amqp_listener = AMQPListener(self.bucket_queue,
+                                          self.eta_scheduler,
                                           logger=self.logger,
                                           initial_prefetch_count=concurrency)
         self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
@@ -264,7 +267,7 @@ class WorkController(object):
         # and they must be stopped in reverse order.
         self.components = filter(None, (self.pool,
                                         self.mediator,
-                                        self.periodic_work_controller,
+                                        self.schedule_controller,
                                         self.clockservice,
                                         self.amqp_listener))