Parcourir la source

Merge branch 'beat' into 1point0

Conflicts:
	celery/tests/test_models.py
	celery/tests/test_worker.py
	celery/worker/__init__.py
Ask Solem il y a 15 ans
Parent
commit
2fd2780b91

+ 0 - 12
celery/tests/test_backends/test_database.py

@@ -3,7 +3,6 @@ from celery.backends.database import Backend
 from celery.utils import gen_unique_id
 from celery.task import PeriodicTask
 from celery import registry
-from celery.models import PeriodicTaskMeta
 from datetime import datetime, timedelta
 
 
@@ -23,17 +22,6 @@ class MyPeriodicTask(PeriodicTask):
 
 class TestDatabaseBackend(unittest.TestCase):
 
-    def test_run_periodic_tasks(self):
-        #obj, created = PeriodicTaskMeta.objects.get_or_create(
-        #                    name=MyPeriodicTask.name,
-        #                    defaults={"last_run_at": datetime.now() -
-        #                        timedelta(days=-4)})
-        #if not created:
-        #    obj.last_run_at = datetime.now() - timedelta(days=4)
-        #    obj.save()
-        b = Backend()
-        b.run_periodic_tasks()
-
     def test_backend(self):
         b = Backend()
         tid = gen_unique_id()

+ 0 - 25
celery/tests/test_models.py

@@ -1,16 +1,9 @@
 import unittest
 from datetime import datetime, timedelta
 from celery.models import TaskMeta
-from celery.task import PeriodicTask
-from celery.registry import tasks
 from celery.utils import gen_unique_id
 
 
-class TestPeriodicTask(PeriodicTask):
-    name = "celery.unittest.test_models.test_periodic_task"
-    run_every = timedelta(minutes=30)
-
-
 class TestModels(unittest.TestCase):
 
     def createTaskMeta(self):
@@ -18,11 +11,6 @@ class TestModels(unittest.TestCase):
         taskmeta, created = TaskMeta.objects.get_or_create(task_id=id)
         return taskmeta
 
-    def createPeriodicTaskMeta(self, name):
-        ptaskmeta, created = PeriodicTaskMeta.objects.get_or_create(name=name,
-                defaults={"last_run_at": datetime.now()})
-        return ptaskmeta
-
     def test_taskmeta(self):
         m1 = self.createTaskMeta()
         m2 = self.createTaskMeta()
@@ -50,16 +38,3 @@ class TestModels(unittest.TestCase):
 
         TaskMeta.objects.delete_expired()
         self.assertFalse(m1 in TaskMeta.objects.all())
-
-    def test_periodic_taskmeta(self):
-        p = self.createPeriodicTaskMeta(TestPeriodicTask.name)
-        # check that repr works.
-        self.assertTrue(unicode(p).startswith("<PeriodicTask:"))
-        self.assertFalse(p in PeriodicTaskMeta.objects.get_waiting_tasks())
-        p.last_run_at = datetime.now() - (TestPeriodicTask.run_every +
-                timedelta(seconds=10))
-        p.save()
-        self.assertTrue(p in PeriodicTaskMeta.objects.get_waiting_tasks())
-        self.assertTrue(isinstance(p.task, TestPeriodicTask))
-
-        p.delay()

+ 21 - 22
celery/tests/test_worker.py

@@ -3,12 +3,13 @@ from Queue import Queue, Empty
 from carrot.connection import BrokerConnection
 from celery.messaging import TaskConsumer
 from celery.worker.job import TaskWrapper
-from celery.worker import AMQPListener, WorkController
+from celery.worker import CarrotListener, WorkController
 from multiprocessing import get_logger
 from carrot.backends.base import BaseMessage
 from celery import registry
 from celery.serialization import pickle
 from celery.utils import gen_unique_id
+from celery.worker.scheduler import Scheduler
 from datetime import datetime, timedelta
 from celery.decorators import task as task_dec
 
@@ -81,16 +82,16 @@ def create_message(backend, **data):
                        content_encoding="binary")
 
 
-class TestAMQPListener(unittest.TestCase):
+class TestCarrotListener(unittest.TestCase):
 
     def setUp(self):
-        self.bucket_queue = Queue()
-        self.hold_queue = Queue()
+        self.ready_queue = Queue()
+        self.eta_scheduler = Scheduler(self.ready_queue)
         self.logger = get_logger()
         self.logger.setLevel(0)
 
     def test_connection(self):
-        l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
 
         c = l.reset_connection()
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
@@ -107,30 +108,30 @@ class TestAMQPListener(unittest.TestCase):
         self.assertTrue(l.task_consumer is None)
 
     def test_receieve_message(self):
-        l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
                            args=[2, 4, 8], kwargs={})
 
         l.receive_message(m.decode(), m)
 
-        in_bucket = self.bucket_queue.get_nowait()
+        in_bucket = self.ready_queue.get_nowait()
         self.assertTrue(isinstance(in_bucket, TaskWrapper))
         self.assertEquals(in_bucket.task_name, foo_task.name)
         self.assertEquals(in_bucket.execute(), 2 * 4 * 8)
-        self.assertRaises(Empty, self.hold_queue.get_nowait)
+        self.assertTrue(self.eta_scheduler.empty())
 
     def test_receieve_message_not_registered(self):
-        l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
 
         self.assertFalse(l.receive_message(m.decode(), m))
-        self.assertRaises(Empty, self.bucket_queue.get_nowait)
-        self.assertRaises(Empty, self.hold_queue.get_nowait)
+        self.assertRaises(Empty, self.ready_queue.get_nowait)
+        self.assertTrue(self.eta_scheduler.empty())
 
     def test_receieve_message_eta(self):
-        l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
+        l = CarrotListener(self.ready_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
                            args=[2, 4, 8], kwargs={},
@@ -138,15 +139,14 @@ class TestAMQPListener(unittest.TestCase):
 
         l.receive_message(m.decode(), m)
 
-        in_hold = self.hold_queue.get_nowait()
-        self.assertEquals(len(in_hold), 3)
-        task, eta, on_accept = in_hold
+        in_hold = self.eta_scheduler.queue[0]
+        self.assertEquals(len(in_hold), 4)
+        eta, priority, task, on_accept = in_hold
         self.assertTrue(isinstance(task, TaskWrapper))
-        self.assertTrue(isinstance(eta, datetime))
         self.assertTrue(callable(on_accept))
         self.assertEquals(task.task_name, foo_task.name)
         self.assertEquals(task.execute(), 2 * 4 * 8)
-        self.assertRaises(Empty, self.bucket_queue.get_nowait)
+        self.assertRaises(Empty, self.ready_queue.get_nowait)
 
 
 class TestWorkController(unittest.TestCase):
@@ -158,12 +158,11 @@ class TestWorkController(unittest.TestCase):
 
     def test_attrs(self):
         worker = self.worker
-        self.assertTrue(hasattr(worker.bucket_queue, "get"))
-        self.assertTrue(hasattr(worker.bucket_queue, "put"))
-        self.assertTrue(isinstance(worker.hold_queue, Queue))
-        self.assertTrue(worker.periodic_work_controller)
+        self.assertTrue(isinstance(worker.ready_queue, Queue))
+        self.assertTrue(isinstance(worker.eta_scheduler, Scheduler))
+        self.assertTrue(worker.schedule_controller)
         self.assertTrue(worker.pool)
-        self.assertTrue(worker.amqp_listener)
+        self.assertTrue(worker.broker_listener)
         self.assertTrue(worker.mediator)
         self.assertTrue(worker.components)
 

+ 6 - 48
celery/tests/test_worker_controllers.py

@@ -4,7 +4,7 @@ import multiprocessing
 from Queue import Queue, Empty
 from datetime import datetime, timedelta
 
-from celery.worker.controllers import Mediator, PeriodicWorkController
+from celery.worker.controllers import Mediator
 from celery.worker.controllers import BackgroundThread
 
 
@@ -48,8 +48,8 @@ class TestBackgroundThread(unittest.TestCase):
 class TestMediator(unittest.TestCase):
 
     def test_mediator_start__stop(self):
-        bucket_queue = Queue()
-        m = Mediator(bucket_queue, lambda t: t)
+        ready_queue = Queue()
+        m = Mediator(ready_queue, lambda t: t)
         m.start()
         self.assertFalse(m._shutdown.isSet())
         self.assertFalse(m._stopped.isSet())
@@ -59,57 +59,15 @@ class TestMediator(unittest.TestCase):
         self.assertTrue(m._stopped.isSet())
 
     def test_mediator_on_iteration(self):
-        bucket_queue = Queue()
+        ready_queue = Queue()
         got = {}
 
         def mycallback(value):
             got["value"] = value.value
 
-        m = Mediator(bucket_queue, mycallback)
-        bucket_queue.put(MockTask("George Constanza"))
+        m = Mediator(ready_queue, mycallback)
+        ready_queue.put(MockTask("George Constanza"))
 
         m.on_iteration()
 
         self.assertEquals(got["value"], "George Constanza")
-
-
-class TestPeriodicWorkController(unittest.TestCase):
-
-    def test_process_hold_queue(self):
-        bucket_queue = Queue()
-        hold_queue = Queue()
-        m = PeriodicWorkController(bucket_queue, hold_queue)
-        m.process_hold_queue()
-
-        scratchpad = {}
-
-        def on_accept():
-            scratchpad["accepted"] = True
-
-        hold_queue.put((MockTask("task1"),
-                        datetime.now() - timedelta(days=1),
-                        on_accept))
-
-        m.process_hold_queue()
-        self.assertRaises(Empty, hold_queue.get_nowait)
-        self.assertTrue(scratchpad.get("accepted"))
-        self.assertEquals(bucket_queue.get_nowait().value, "task1")
-        tomorrow = datetime.now() + timedelta(days=1)
-        hold_queue.put((MockTask("task2"), tomorrow, on_accept))
-        m.process_hold_queue()
-        self.assertRaises(Empty, bucket_queue.get_nowait)
-        value, eta, on_accept = hold_queue.get_nowait()
-        self.assertEquals(value.value, "task2")
-        self.assertEquals(eta, tomorrow)
-
-    def test_run_periodic_tasks(self):
-        bucket_queue = Queue()
-        hold_queue = Queue()
-        m = PeriodicWorkController(bucket_queue, hold_queue)
-        m.run_periodic_tasks()
-
-    def test_on_iteration(self):
-        bucket_queue = Queue()
-        hold_queue = Queue()
-        m = PeriodicWorkController(bucket_queue, hold_queue)
-        m.on_iteration()

+ 38 - 36
celery/worker/__init__.py

@@ -4,9 +4,10 @@ The Multiprocessing Worker Server
 
 """
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
-from celery.worker.controllers import Mediator, PeriodicWorkController
+from celery.worker.controllers import Mediator, ScheduleController
 from celery.beat import ClockServiceThread
 from celery.worker.job import TaskWrapper
+from celery.worker.scheduler import Scheduler
 from celery.exceptions import NotRegistered
 from celery.messaging import get_consumer_set
 from celery.log import setup_logger
@@ -22,20 +23,20 @@ import logging
 import socket
 
 
-class AMQPListener(object):
+class CarrotListener(object):
     """Listen for messages received from the AMQP broker and
     move them the the bucket queue for task processing.
 
-    :param bucket_queue: See :attr:`bucket_queue`.
-    :param hold_queue: See :attr:`hold_queue`.
+    :param ready_queue: See :attr:`ready_queue`.
+    :param eta_scheduler: See :attr:`eta_scheduler`.
 
-    .. attribute:: bucket_queue
+    .. attribute:: ready_queue
 
         The queue that holds tasks ready for processing immediately.
 
-    .. attribute:: hold_queue
+    .. attribute:: eta_scheduler
 
-        The queue that holds paused tasks. Reasons for being paused include
+        Scheduler for paused tasks. Reasons for being paused include
         a countdown/eta or that it's waiting for retry.
 
     .. attribute:: logger
@@ -44,12 +45,12 @@ class AMQPListener(object):
 
     """
 
-    def __init__(self, bucket_queue, hold_queue, logger,
+    def __init__(self, ready_queue, eta_scheduler, logger,
             initial_prefetch_count=2):
         self.amqp_connection = None
         self.task_consumer = None
-        self.bucket_queue = bucket_queue
-        self.hold_queue = hold_queue
+        self.ready_queue = ready_queue
+        self.eta_scheduler = eta_scheduler
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
 
@@ -66,17 +67,17 @@ class AMQPListener(object):
             try:
                 self.consume_messages()
             except (socket.error, AMQPConnectionException, IOError):
-                self.logger.error("AMQPListener: Connection to broker lost. "
-                                + "Trying to re-establish connection...")
+                self.logger.error("CarrotListener: Connection to broker lost."
+                                + " Trying to re-establish connection...")
 
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
         task_consumer = self.task_consumer
 
-        self.logger.debug("AMQPListener: Starting message consumer...")
+        self.logger.debug("CarrotListener: Starting message consumer...")
         it = task_consumer.iterconsume(limit=None)
 
-        self.logger.debug("AMQPListener: Ready to accept tasks!")
+        self.logger.debug("CarrotListener: Ready to accept tasks!")
 
         while True:
             self.task_consumer.qos(prefetch_count=int(self.prefetch_count))
@@ -106,11 +107,13 @@ class AMQPListener(object):
             self.prefetch_count.increment()
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
                     task.task_name, task.task_id, eta))
-            self.hold_queue.put((task, eta, self.prefetch_count.decrement))
+            self.eta_scheduler.enter(task,
+                                     eta=eta,
+                                     callback=self.prefetch_count.decrement)
         else:
             self.logger.info("Got task from broker: %s[%s]" % (
                     task.task_name, task.task_id))
-            self.bucket_queue.put(task)
+            self.ready_queue.put(task)
 
     def close_connection(self):
         """Close the AMQP connection."""
@@ -119,7 +122,7 @@ class AMQPListener(object):
             self.task_consumer = None
         if self.amqp_connection:
             self.logger.debug(
-                    "AMQPListener: Closing connection to the broker...")
+                    "CarrotListener: Closing connection to the broker...")
             self.amqp_connection.close()
             self.amqp_connection = None
 
@@ -131,7 +134,7 @@ class AMQPListener(object):
 
         """
         self.logger.debug(
-                "AMQPListener: Re-establishing connection to the broker...")
+                "CarrotListener: Re-establishing connection to the broker...")
         self.close_connection()
         self.amqp_connection = self._open_connection()
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
@@ -161,7 +164,7 @@ class AMQPListener(object):
         conn = retry_over_time(_establish_connection, (socket.error, IOError),
                                errback=_connection_error_handler,
                                max_retries=conf.AMQP_CONNECTION_MAX_RETRIES)
-        self.logger.debug("AMQPListener: Connection Established.")
+        self.logger.debug("CarrotListener: Connection Established.")
         return conn
 
 
@@ -199,7 +202,7 @@ class WorkController(object):
 
         The :class:`multiprocessing.Pool` instance used.
 
-    .. attribute:: bucket_queue
+    .. attribute:: ready_queue
 
         The :class:`Queue.Queue` that holds tasks ready for immediate
         processing.
@@ -210,17 +213,17 @@ class WorkController(object):
         back the task include waiting for ``eta`` to pass or the task is being
         retried.
 
-    .. attribute:: periodic_work_controller
+    .. attribute:: schedule_controller
 
-        Instance of :class:`celery.worker.controllers.PeriodicWorkController`.
+        Instance of :class:`celery.worker.controllers.ScheduleController`.
 
     .. attribute:: mediator
 
         Instance of :class:`celery.worker.controllers.Mediator`.
 
-    .. attribute:: amqp_listener
+    .. attribute:: broker_listener
 
-        Instance of :class:`AMQPListener`.
+        Instance of :class:`CarrotListener`.
 
     """
     loglevel = logging.ERROR
@@ -241,22 +244,21 @@ class WorkController(object):
 
         # Queues
         if conf.DISABLE_RATE_LIMITS:
-            self.bucket_queue = Queue()
+            self.ready_queue = Queue()
         else:
-            self.bucket_queue = TaskBucket(task_registry=registry.tasks)
-        self.hold_queue = Queue()
+            self.ready_queue = TaskBucket(task_registry=registry.tasks)
+        self.eta_scheduler = Scheduler(self.ready_queue)
 
         self.logger.debug("Instantiating thread components...")
 
         # Threads+Pool
-        self.periodic_work_controller = PeriodicWorkController(
-                                                    self.bucket_queue,
-                                                    self.hold_queue)
+        self.schedule_controller = ScheduleController(self.eta_scheduler)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
-                                          logger=self.logger,
-                                          initial_prefetch_count=concurrency)
-        self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
+        self.broker_listener = CarrotListener(self.ready_queue,
+                                        self.eta_scheduler,
+                                        logger=self.logger,
+                                        initial_prefetch_count=concurrency)
+        self.mediator = Mediator(self.ready_queue, self.safe_process_task)
 
         self.clockservice = None
         if self.embed_clockservice:
@@ -268,9 +270,9 @@ class WorkController(object):
         # and they must be stopped in reverse order.
         self.components = filter(None, (self.pool,
                                         self.mediator,
-                                        self.periodic_work_controller,
+                                        self.schedule_controller,
                                         self.clockservice,
-                                        self.amqp_listener))
+                                        self.broker_listener))
 
     def start(self):
         """Starts the workers main loop."""

+ 19 - 47
celery/worker/controllers.py

@@ -65,29 +65,29 @@ class BackgroundThread(threading.Thread):
 class Mediator(BackgroundThread):
     """Thread continuously sending tasks in the queue to the pool.
 
-    .. attribute:: bucket_queue
+    .. attribute:: ready_queue
 
         The task queue, a :class:`Queue.Queue` instance.
 
     .. attribute:: callback
 
         The callback used to process tasks retrieved from the
-        :attr:`bucket_queue`.
+        :attr:`ready_queue`.
 
     """
 
-    def __init__(self, bucket_queue, callback):
+    def __init__(self, ready_queue, callback):
         super(Mediator, self).__init__()
-        self.bucket_queue = bucket_queue
+        self.ready_queue = ready_queue
         self.callback = callback
 
     def on_iteration(self):
         """Get tasks from bucket queue and apply the task callback."""
         logger = get_default_logger()
         try:
-            logger.debug("Mediator: Trying to get message from bucket_queue")
+            logger.debug("Mediator: Trying to get message from ready_queue")
             # This blocks until there's a message in the queue.
-            task = self.bucket_queue.get(timeout=1)
+            task = self.ready_queue.get(timeout=1)
         except QueueEmpty:
             logger.debug("Mediator: Bucket queue is empty.")
         else:
@@ -96,48 +96,20 @@ class Mediator(BackgroundThread):
             self.callback(task)
 
 
-class PeriodicWorkController(BackgroundThread):
-    """Finds tasks in the hold queue that is
-    ready for execution and moves them to the bucket queue.
+class ScheduleController(BackgroundThread):
+    """Schedules tasks with an ETA by moving them to the bucket queue."""
 
-    (Tasks in the hold queue are tasks waiting for retry, or with an
-    ``eta``/``countdown``.)
-
-    """
-
-    def __init__(self, bucket_queue, hold_queue):
-        super(PeriodicWorkController, self).__init__()
-        self.hold_queue = hold_queue
-        self.bucket_queue = bucket_queue
+    def __init__(self, eta_schedule):
+        super(ScheduleController, self).__init__()
+        self._scheduler = iter(eta_schedule)
 
     def on_iteration(self):
-        """Process the hold queue."""
-        logger = get_default_logger()
-        logger.debug("PeriodicWorkController: Processing hold queue...")
-        self.process_hold_queue()
-        logger.debug("PeriodicWorkController: Going to sleep...")
-        time.sleep(1)
-
-    def process_hold_queue(self):
-        """Finds paused tasks that are ready for execution and move
-        them to the :attr:`bucket_queue`."""
+        """Wake-up scheduler"""
         logger = get_default_logger()
-        try:
-            logger.debug(
-                "PeriodicWorkController: Getting next task from hold queue..")
-            task, eta, on_accept = self.hold_queue.get_nowait()
-        except QueueEmpty:
-            logger.debug("PeriodicWorkController: Hold queue is empty")
-            return
-
-        if datetime.now() >= eta:
-            logger.debug(
-                "PeriodicWorkController: Time to run %s[%s] (%s)..." % (
-                    task.task_name, task.task_id, eta))
-            on_accept() # Run the accept task callback.
-            self.bucket_queue.put(task)
-        else:
-            logger.debug(
-                "PeriodicWorkController: ETA not ready for %s[%s] (%s)..." % (
-                    task.task_name, task.task_id, eta))
-            self.hold_queue.put((task, eta, on_accept))
+        logger.debug("ScheduleController: Scheduler wake-up")
+        delay = self._scheduler.next()
+        logger.debug(
+            "ScheduleController: Next wake-up estimated at %s seconds..." % (
+                delay))
+        time.sleep(delay)
+

+ 50 - 0
celery/worker/scheduler.py

@@ -0,0 +1,50 @@
+import heapq
+import time
+
+
+class Scheduler(object):
+
+    def __init__(self, ready_queue):
+        self.ready_queue = ready_queue
+        self._queue = []
+
+    def enter(self, item, eta=None, priority=0, callback=None):
+        eta = time.mktime(eta.timetuple()) if eta else time.time()
+        heapq.heappush(self._queue, (eta, priority, item, callback))
+
+    def __iter__(self):
+        """The iterator yields the time to sleep for between runs."""
+
+        # localize variable access
+        q = self._queue
+        nowfun = time.time
+        pop = heapq.heappop
+        ready_queue = self.ready_queue
+
+        while True:
+            if q:
+                eta, priority, item, callback = verify = q[0]
+                now = nowfun()
+
+                if now < eta:
+                    yield eta - now
+                else:
+                    event = pop(q)
+                    print("eta->%s priority->%s item->%s" % (
+                        eta, priority, item))
+
+                    if event is verify:
+                        ready_queue.put(item)
+                        callback and callback()
+                        yield 0
+                    else:
+                        heapq.heappush(q, event)
+            yield 1
+
+    def empty(self):
+        return not self._queue
+
+    @property
+    def queue(self):
+        events = list(self._queue)
+        return map(heapq.heappop, [events]*len(events))