Selaa lähdekoodia

ETA scheduler moved to external timer2 module.

Ask Solem 14 vuotta sitten
vanhempi
commit
f6ec0e3b5a

+ 1 - 1
celery/conf.py

@@ -48,7 +48,7 @@ _DEFAULTS = {
     "CELERYD_POOL_PUTLOCKS": True,
     "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
     "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
-    "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
+    "CELERYD_ETA_SCHEDULER": "timer2.Timer",
     "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
     "CELERYD_CONCURRENCY": 0, # defaults to cpu count
     "CELERYD_PREFETCH_MULTIPLIER": 4,

+ 7 - 7
celery/tests/test_worker.py

@@ -7,6 +7,7 @@ from Queue import Empty
 
 from carrot.backends.base import BaseMessage
 from carrot.connection import BrokerConnection
+from timer2 import Timer
 
 from celery import conf
 from celery.decorators import task as task_dec
@@ -17,7 +18,6 @@ from celery.worker import WorkController
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
 from celery.worker.listener import CarrotListener, QoS, RUN
-from celery.worker.scheduler import Scheduler
 
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import execute_context
@@ -160,7 +160,7 @@ class test_CarrotListener(unittest.TestCase):
 
     def setUp(self):
         self.ready_queue = FastQueue()
-        self.eta_schedule = Scheduler(self.ready_queue)
+        self.eta_schedule = Timer()
         self.logger = get_logger()
         self.logger.setLevel(0)
 
@@ -336,7 +336,7 @@ class test_CarrotListener(unittest.TestCase):
         items = [entry[2] for entry in self.eta_schedule.queue]
         found = 0
         for item in items:
-            if item.task_name == foo_task.name:
+            if item.args[0].task_name == foo_task.name:
                 found = True
         self.assertTrue(found)
         self.assertTrue(l.task_consumer.prefetch_count_incremented)
@@ -388,10 +388,10 @@ class test_CarrotListener(unittest.TestCase):
         l.receive_message(m.decode(), m)
 
         in_hold = self.eta_schedule.queue[0]
-        self.assertEqual(len(in_hold), 4)
-        eta, priority, task, on_accept = in_hold
+        self.assertEqual(len(in_hold), 3)
+        eta, priority, entry = in_hold
+        task = entry.args[0]
         self.assertIsInstance(task, TaskRequest)
-        self.assertTrue(callable(on_accept))
         self.assertEqual(task.task_name, foo_task.name)
         self.assertEqual(task.execute(), 2 * 4 * 8)
         self.assertRaises(Empty, self.ready_queue.get_nowait)
@@ -466,7 +466,7 @@ class test_WorkController(unittest.TestCase):
 
     def test_attrs(self):
         worker = self.worker
-        self.assertIsInstance(worker.eta_schedule, Scheduler)
+        self.assertIsInstance(worker.scheduler, Timer)
         self.assertTrue(worker.scheduler)
         self.assertTrue(worker.pool)
         self.assertTrue(worker.listener)

+ 5 - 3
celery/tests/test_worker_control.py

@@ -1,6 +1,8 @@
 import socket
 import unittest2 as unittest
 
+from timer2 import Timer
+
 from celery import conf
 from celery.decorators import task
 from celery.registry import tasks
@@ -10,7 +12,6 @@ from celery.worker import control
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
 from celery.worker.state import revoked
-from celery.worker.scheduler import Scheduler
 
 hostname = socket.gethostname()
 
@@ -44,7 +45,7 @@ class Listener(object):
                                          task_id=gen_unique_id(),
                                          args=(2, 2),
                                          kwargs={}))
-        self.eta_schedule = Scheduler(self.ready_queue)
+        self.eta_schedule = Timer()
         self.event_dispatcher = Dispatcher()
 
 
@@ -81,7 +82,8 @@ class test_ControlPanel(unittest.TestCase):
         listener = Listener()
         panel = self.create_panel(listener=listener)
         self.assertFalse(panel.execute("dump_schedule"))
-        listener.eta_schedule.enter("foo", eta=100)
+        import operator
+        listener.eta_schedule.schedule.enter(100, operator.add, (2, 2))
         self.assertTrue(panel.execute("dump_schedule"))
 
     def test_dump_reserved(self):

+ 6 - 58
celery/tests/test_worker_controllers.py

@@ -1,10 +1,9 @@
-import time
 import unittest2 as unittest
+
 from Queue import Queue
 
 from celery.utils import gen_unique_id
 from celery.worker.controllers import Mediator
-from celery.worker.controllers import BackgroundThread, ScheduleController
 from celery.worker.state import revoked as revoked_tasks
 
 
@@ -26,35 +25,7 @@ class MockTask(object):
         return False
 
 
-class MyBackgroundThread(BackgroundThread):
-
-    def on_iteration(self):
-        time.sleep(1)
-
-
-class TestBackgroundThread(unittest.TestCase):
-
-    def test_on_iteration(self):
-        self.assertRaises(NotImplementedError,
-                BackgroundThread().on_iteration)
-
-    def test_run(self):
-        t = MyBackgroundThread()
-        t._shutdown.set()
-        t.run()
-        self.assertTrue(t._stopped.isSet())
-
-    def test_start_stop(self):
-        t = MyBackgroundThread()
-        t.start()
-        self.assertFalse(t._shutdown.isSet())
-        self.assertFalse(t._stopped.isSet())
-        t.stop()
-        self.assertTrue(t._shutdown.isSet())
-        self.assertTrue(t._stopped.isSet())
-
-
-class TestMediator(unittest.TestCase):
+class test_Mediator(unittest.TestCase):
 
     def test_mediator_start__stop(self):
         ready_queue = Queue()
@@ -67,7 +38,7 @@ class TestMediator(unittest.TestCase):
         self.assertTrue(m._shutdown.isSet())
         self.assertTrue(m._stopped.isSet())
 
-    def test_mediator_on_iteration(self):
+    def test_mediator_move(self):
         ready_queue = Queue()
         got = {}
 
@@ -77,11 +48,11 @@ class TestMediator(unittest.TestCase):
         m = Mediator(ready_queue, mycallback)
         ready_queue.put(MockTask("George Costanza"))
 
-        m.on_iteration()
+        m.move()
 
         self.assertEqual(got["value"], "George Costanza")
 
-    def test_mediator_on_iteration_revoked(self):
+    def test_mediator_move_revoked(self):
         ready_queue = Queue()
         got = {}
 
@@ -94,30 +65,7 @@ class TestMediator(unittest.TestCase):
         revoked_tasks.add(t.task_id)
         ready_queue.put(t)
 
-        m.on_iteration()
+        m.move()
 
         self.assertNotIn("value", got)
         self.assertTrue(t.acked)
-
-
-class TestScheduleController(unittest.TestCase):
-
-    def test_on_iteration(self):
-        times = range(10) + [None]
-        c = ScheduleController(times)
-
-        import time
-        slept = [None]
-
-        def _sleep(count):
-            slept[0] = count
-
-        old_sleep = time.sleep
-        time.sleep = _sleep
-        try:
-            for i in times:
-                c.on_iteration()
-                res = i or 1
-                self.assertEqual(slept[0], res)
-        finally:
-            time.sleep = old_sleep

+ 0 - 97
celery/tests/test_worker_scheduler.py

@@ -1,97 +0,0 @@
-from __future__ import generators
-
-import unittest2 as unittest
-
-from datetime import datetime, timedelta
-from Queue import Queue, Empty
-
-from celery.worker.scheduler import Scheduler
-
-
-class MockItem(object):
-
-    def __init__(self, value, revoked=False):
-        self.task_id = value
-        self.is_revoked = revoked
-
-    def revoked(self):
-        return self.is_revoked
-
-
-class test_Scheduler(unittest.TestCase):
-
-    def test_sched_and_run_now(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        now = datetime.now()
-
-        callback_called = [False]
-        def callback():
-            callback_called[0] = True
-
-        sched.enter(MockItem("foo"), eta=now, callback=callback)
-
-        remaining = iter(sched).next()
-        self.assertIsNone(remaining)
-        self.assertTrue(callback_called[0])
-        self.assertEqual(ready_queue.get_nowait().task_id, "foo")
-
-    def test_sched_revoked(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        now = datetime.now()
-
-        callback_called = [False]
-        def callback():
-            callback_called[0] = True
-
-        sched.enter(MockItem("foo", revoked=True), eta=now, callback=callback)
-        iter(sched).next()
-        self.assertFalse(callback_called[0])
-        self.assertRaises(Empty, ready_queue.get_nowait)
-        self.assertFalse(sched.queue)
-        sched.clear()
-
-    def test_sched_clear(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        sched.enter(MockItem("foo"), eta=datetime.now(), callback=None)
-        self.assertFalse(sched.empty())
-        sched.clear()
-        self.assertTrue(sched.empty())
-
-    def test_sched_info(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        item = MockItem("foo")
-        sched.enter(item, eta=10, callback=None)
-        self.assertDictEqual({"eta": 10, "priority": 0,
-                              "item": item}, sched.info().next())
-
-    def test_sched_queue(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        sched.enter(MockItem("foo"), eta=datetime.now(), callback=None)
-        self.assertTrue(sched.queue)
-
-    def test_sched_run_later(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        now = datetime.now()
-
-        callback_called = [False]
-        def callback():
-            callback_called[0] = True
-
-        eta = now + timedelta(seconds=10)
-        sched.enter(MockItem("foo"), eta=eta, callback=callback)
-
-        remaining = iter(sched).next()
-        self.assertTrue(remaining > 7 or remaining == sched.max_interval)
-        self.assertFalse(callback_called[0])
-        self.assertRaises(Empty, ready_queue.get_nowait)
-
-    def test_empty_queue_yields_None(self):
-        ready_queue = Queue()
-        sched = Scheduler(ready_queue)
-        self.assertIsNone(iter(sched).next())

+ 17 - 4
celery/worker/__init__.py

@@ -8,7 +8,10 @@ import logging
 import traceback
 from multiprocessing.util import Finalize
 
+from timer2 import Timer
+
 from celery import conf
+from celery import log
 from celery import registry
 from celery import platform
 from celery import signals
@@ -18,7 +21,6 @@ from celery.utils import noop, instantiate
 
 from celery.worker import state
 from celery.worker.buckets import TaskBucket, FastQueue
-from celery.worker.scheduler import Scheduler
 
 RUN = 0x1
 CLOSE = 0x2
@@ -146,6 +148,8 @@ class WorkController(object):
         self.task_soft_time_limit = task_soft_time_limit
         self.max_tasks_per_child = max_tasks_per_child
         self.pool_putlocks = pool_putlocks
+        self.timer_debug = log.SilenceRepeated(self.logger.debug,
+                                               max_iterations=10)
         self.db = db
         self._finalize = Finalize(self, self.stop, exitpriority=1)
 
@@ -158,7 +162,6 @@ class WorkController(object):
             self.ready_queue = FastQueue()
         else:
             self.ready_queue = TaskBucket(task_registry=registry.tasks)
-        self.eta_schedule = Scheduler(self.ready_queue, logger=self.logger)
 
         self.logger.debug("Instantiating thread components...")
 
@@ -174,7 +177,9 @@ class WorkController(object):
                                     callback=self.process_task,
                                     logger=self.logger)
         self.scheduler = instantiate(eta_scheduler_cls,
-                                     self.eta_schedule, logger=self.logger)
+                               precision=conf.CELERYD_ETA_SCHEDULER_PRECISION,
+                               on_error=self.on_timer_error,
+                               on_tick=self.on_timer_tick)
 
         self.clockservice = None
         if self.embed_clockservice:
@@ -184,7 +189,7 @@ class WorkController(object):
         prefetch_count = self.concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
         self.listener = instantiate(listener_cls,
                                     self.ready_queue,
-                                    self.eta_schedule,
+                                    self.scheduler,
                                     logger=self.logger,
                                     hostname=self.hostname,
                                     send_events=self.send_events,
@@ -252,3 +257,11 @@ class WorkController(object):
 
         self.listener.close_connection()
         self._state = TERMINATE
+
+    def on_timer_error(self, exc_info):
+        _, exc, _ = exc_info
+        self.logger.error("Timer error: %r" % (exc, ))
+
+    def on_timer_tick(self, delay):
+        self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)
+

+ 1 - 1
celery/worker/control/builtins.py

@@ -99,7 +99,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
 
 @Panel.register
 def dump_schedule(panel, safe=False, **kwargs):
-    schedule = panel.listener.eta_schedule
+    schedule = panel.listener.eta_schedule.schedule
     if not schedule.queue:
         panel.logger.info("--Empty schedule--")
         return []

+ 17 - 75
celery/worker/controllers.py

@@ -11,59 +11,7 @@ from celery import conf
 from celery import log
 
 
-class BackgroundThread(threading.Thread):
-    """Thread running an infinite loop which for every iteration
-    calls its :meth:`on_iteration` method.
-
-    This also implements graceful shutdown of the thread by providing
-    the :meth:`stop` method.
-
-    """
-
-    def __init__(self):
-        super(BackgroundThread, self).__init__()
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
-        self.setDaemon(True)
-
-    def run(self):
-        """This is the body of the thread.
-
-        To start the thread use :meth:`start` instead.
-
-        """
-        self.on_start()
-
-        while 1:
-            if self._shutdown.isSet():
-                break
-            self.on_iteration()
-        self._stopped.set() # indicate that we are stopped
-
-    def on_start(self):
-        """This handler is run at thread start, just before the infinite
-        loop."""
-        pass
-
-    def on_iteration(self):
-        """This is the method called for every iteration and must be
-        implemented by every subclass of :class:`BackgroundThread`."""
-        raise NotImplementedError(
-                "InfiniteThreads must implement on_iteration")
-
-    def on_stop(self):
-        """This handler is run when the thread is shutdown."""
-        pass
-
-    def stop(self):
-        """Gracefully shutdown the thread."""
-        self.on_stop()
-        self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
-        self.join(1e100)
-
-
-class Mediator(BackgroundThread):
+class Mediator(threading.Thread):
     """Thread continuously sending tasks in the queue to the pool.
 
     .. attribute:: ready_queue
@@ -78,13 +26,15 @@ class Mediator(BackgroundThread):
     """
 
     def __init__(self, ready_queue, callback, logger=None):
-        super(Mediator, self).__init__()
+        threading.Thread.__init__(self)
         self.logger = logger or log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+        self.setDaemon(True)
 
-    def on_iteration(self):
-        """Get tasks from bucket queue and apply the task callback."""
+    def move(self):
         try:
             # This blocks until there's a message in the queue.
             task = self.ready_queue.get(timeout=1)
@@ -95,25 +45,17 @@ class Mediator(BackgroundThread):
                 return
 
             self.logger.debug(
-                    "Mediator: Running callback for task: %s[%s]" % (
-                        task.task_name, task.task_id))
+                "Mediator: Running callback for task: %s[%s]" % (
+                    task.task_name, task.task_id))
             self.callback(task) # execute
 
+    def run(self):
+        while not self._shutdown.isSet():
+            self.move()
+        self._stopped.set() # indicate that we are stopped
 
-class ScheduleController(BackgroundThread):
-    """Schedules tasks with an ETA by moving them to the bucket queue."""
-
-    def __init__(self, eta_schedule, logger=None,
-            precision=None):
-        super(ScheduleController, self).__init__()
-        self.logger = logger or log.get_default_logger()
-        self._scheduler = iter(eta_schedule)
-        self.precision = precision or conf.CELERYD_ETA_SCHEDULER_PRECISION
-        self.debug = log.SilenceRepeated(self.logger.debug, max_iterations=10)
-
-    def on_iteration(self):
-        """Wake-up scheduler"""
-        delay = self._scheduler.next()
-        self.debug("ScheduleController: Scheduler wake-up"
-                "ScheduleController: Next wake-up eta %s seconds..." % delay)
-        time.sleep(delay or self.precision)
+    def stop(self):
+        """Gracefully shutdown the thread."""
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done
+        self.join(1e100)

+ 7 - 3
celery/worker/listener.py

@@ -46,7 +46,7 @@ up and running.
   again, and again.
 
 * If the task has an ETA/countdown, the task is moved to the ``eta_schedule``
-  so the :class:`~celery.worker.scheduler.Scheduler` can schedule it at its
+  so the :class:`timer2.Timer` can schedule it at its
   deadline. Tasks without an eta are moved immediately to the ``ready_queue``,
   so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
   to be sent to the pool.
@@ -267,11 +267,15 @@ class CarrotListener(object):
 
         if task.eta:
             self.qos.increment()
-            self.eta_schedule.enter(task, eta=task.eta,
-                    callback=self.qos.decrement_eventually)
+            self.eta_schedule.apply_at(task.eta,
+                                       self.apply_eta_task, (task, ))
         else:
             self.ready_queue.put(task)
 
+    def apply_eta_task(self, task):
+        self.ready_queue.put(task)
+        self.qos.decrement_eventually()
+
     def on_control(self, control):
         """Handle received remote control command."""
         return self.control_dispatch.dispatch_from_message(control)

+ 0 - 98
celery/worker/scheduler.py

@@ -1,98 +0,0 @@
-from __future__ import generators
-
-import time
-import heapq
-
-from datetime import datetime
-
-from celery import log
-
-DEFAULT_MAX_INTERVAL = 2
-
-
-class Scheduler(object):
-    """ETA scheduler.
-
-    :param ready_queue: Queue to move items ready for processing.
-    :keyword max_interval: Maximum sleep interval between iterations.
-        Default is 2 seconds.
-
-    """
-
-    def __init__(self, ready_queue, logger=None,
-            max_interval=DEFAULT_MAX_INTERVAL):
-        self.max_interval = float(max_interval)
-        self.ready_queue = ready_queue
-        self.logger = logger or log.get_default_logger()
-        self._queue = []
-
-    def enter(self, item, eta=None, priority=0, callback=None):
-        """Enter item into the scheduler.
-
-        :param item: Item to enter.
-        :param eta: Scheduled time as a :class:`datetime.datetime` object.
-        :param priority: Unused.
-        :param callback: Callback to call when the item is scheduled.
-            This callback takes no arguments.
-
-        """
-        if isinstance(eta, datetime):
-            try:
-                eta = time.mktime(eta.timetuple())
-            except OverflowError:
-                # this machine can't represent the passed in time as
-                # a unix timestamp just ignore this for now
-                self.logger.error("Cannot represent %s as a unix timestamp. "
-                                  "Ignoring %s." % (eta, item))
-                return
-        eta = eta or time.time()
-        heapq.heappush(self._queue, (eta, priority, item, callback))
-
-    def __iter__(self):
-        """The iterator yields the time to sleep for between runs."""
-
-        # localize variable access
-        nowfun = time.time
-        pop = heapq.heappop
-        ready_queue = self.ready_queue
-
-        while 1:
-            if self._queue:
-                eta, priority, item, callback = verify = self._queue[0]
-                now = nowfun()
-
-                if item.revoked():
-                    event = pop(self._queue)
-                    if event is not verify:
-                        heapq.heappush(self._queue, event)
-                    continue
-
-                if now < eta:
-                    yield min(eta - now, self.max_interval)
-                else:
-                    event = pop(self._queue)
-
-                    if event is verify:
-                        ready_queue.put(item)
-                        if callback is not None:
-                            callback()
-                        continue
-                    else:
-                        heapq.heappush(self._queue, event)
-            yield None
-
-    def empty(self):
-        """Is the schedule empty?"""
-        return not self._queue
-
-    def clear(self):
-        self._queue = []
-
-    def info(self):
-        return ({"eta": eta, "priority": priority, "item": item}
-                    for eta, priority, item, _ in self.queue)
-
-    @property
-    def queue(self):
-        events = list(self._queue)
-        return map(heapq.heappop, [events]*len(events))

+ 0 - 11
docs/internals/reference/celery.worker.scheduler.rst

@@ -1,11 +0,0 @@
-============================================
- Worker Scheduler - celery.worker.scheduler
-============================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.worker.scheduler
-
-.. automodule:: celery.worker.scheduler
-    :members:
-    :undoc-members:

+ 0 - 1
docs/internals/reference/index.rst

@@ -13,7 +13,6 @@
     celery.worker.job
     celery.worker.controllers
     celery.worker.buckets
-    celery.worker.scheduler
     celery.worker.heartbeat
     celery.worker.control
     celery.worker.control.builtins