Forráskód Böngészése

Only ever use one timer thread, and instead set the priorority for items.

The CELERYD_ETA_SCHEDULER and CELERYD_ETA_SCHEDULER_PRECISION
settings has been renamed to

    - CELERYD_TIMER
    - CELERYD_TIMER_PRECISION

May consider adding aliases to the old names, but doubt that they
are used.
Ask Solem 13 éve
szülő
commit
c0ac1b763f

+ 3 - 3
celery/app/defaults.py

@@ -164,11 +164,11 @@ NAMESPACES = {
         "AUTORELOADER": Option("celery.worker.autoreload.Autoreloader"),
         "BOOT_STEPS": Option((), type="tuple"),
         "CONCURRENCY": Option(0, type="int"),
-        "ETA_SCHEDULER": Option(None, type="string"),
-        "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
+        "TIMER": Option(None, type="string"),
+        "TIMER_PRECISION": Option(1.0, type="float"),
         "FORCE_EXECV": Option(True, type="bool"),
         "HIJACK_ROOT_LOGGER": Option(True, type="bool"),
-        "CONSUMER": Option("celery.worker.consumer.Consumer"),
+        "CONSUMER": Option(None, type="string"),
         "LOG_FORMAT": Option(DEFAULT_PROCESS_LOG_FMT),
         "LOG_COLOR": Option(type="bool"),
         "LOG_LEVEL": Option("WARN", deprecate_by="2.4", remove_by="3.0",

+ 3 - 3
celery/tests/worker/test_control.py

@@ -43,7 +43,7 @@ class Consumer(object):
                                          uuid(),
                                          args=(2, 2),
                                          kwargs={}))
-        self.eta_schedule = Timer()
+        self.timer = Timer()
         self.app = current_app
         self.event_dispatcher = Mock()
         self.controller = WorkController()
@@ -243,8 +243,8 @@ class test_ControlPanel(Case):
         panel = self.create_panel(consumer=consumer)
         self.assertFalse(panel.handle("dump_schedule"))
         r = TaskRequest(mytask.name, "CAFEBABE", (), {})
-        consumer.eta_schedule.schedule.enter(
-                consumer.eta_schedule.Entry(lambda x: x, (r, )),
+        consumer.timer.schedule.enter(
+                consumer.timer.Entry(lambda x: x, (r, )),
                     datetime.now() + timedelta(seconds=10))
         self.assertTrue(panel.handle("dump_schedule"))
 

+ 53 - 82
celery/tests/worker/test_worker.py

@@ -24,7 +24,7 @@ from celery.utils import uuid
 from celery.worker import WorkController, Queues, Timers
 from celery.worker.buckets import FastQueue
 from celery.worker.job import Request
-from celery.worker.consumer import Consumer as MainConsumer
+from celery.worker.consumer import BlockingConsumer
 from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
 from celery.utils.serialization import pickle
 from celery.utils.timer2 import Timer
@@ -37,7 +37,7 @@ class PlaceHolder(object):
         pass
 
 
-class MyKombuConsumer(MainConsumer):
+class MyKombuConsumer(BlockingConsumer):
     broadcast_consumer = Mock()
     task_consumer = Mock()
 
@@ -208,14 +208,13 @@ class test_Consumer(Case):
 
     def setUp(self):
         self.ready_queue = FastQueue()
-        self.eta_schedule = Timer()
+        self.timer = Timer()
 
     def tearDown(self):
-        self.eta_schedule.stop()
+        self.timer.stop()
 
     def test_info(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.qos = QoS(l.task_consumer, 10)
         info = l.info
         self.assertEqual(info["prefetch_count"], 10)
@@ -226,14 +225,12 @@ class test_Consumer(Case):
         self.assertTrue(info["broker"])
 
     def test_start_when_closed(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._state = CLOSE
         l.start()
 
     def test_connection(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 
         l.reset_connection()
         self.assertIsInstance(l.connection, BrokerConnection)
@@ -258,13 +255,11 @@ class test_Consumer(Case):
         self.assertIsNone(l.task_consumer)
 
     def test_close_connection(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._state = RUN
         l.close_connection()
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         eventer = l.event_dispatcher = Mock()
         eventer.enabled = True
         heart = l.heart = MockHeart()
@@ -275,8 +270,7 @@ class test_Consumer(Case):
 
     @patch("celery.worker.consumer.warn")
     def test_receive_message_unknown(self, warn):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         backend = Mock()
         m = create_message(backend, unknown={"baz": "!!!"})
         l.event_dispatcher = Mock()
@@ -288,8 +282,7 @@ class test_Consumer(Case):
     @patch("celery.utils.timer2.to_timestamp")
     def test_receive_message_eta_OverflowError(self, to_timestamp):
         to_timestamp.side_effect = OverflowError()
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
                                    args=("2, 2"),
                                    kwargs={},
@@ -304,8 +297,7 @@ class test_Consumer(Case):
 
     @patch("celery.worker.consumer.error")
     def test_receive_message_InvalidTaskError(self, error):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
                            args=(1, 2), kwargs="foobarbaz", id=1)
         l.update_strategies()
@@ -317,8 +309,7 @@ class test_Consumer(Case):
 
     @patch("celery.worker.consumer.crit")
     def test_on_decode_error(self, crit):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 
         class MockMessage(Mock):
             content_type = "application/x-msgpack"
@@ -331,8 +322,7 @@ class test_Consumer(Case):
         self.assertIn("Can't decode message body", crit.call_args[0][0])
 
     def test_receieve_message(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
                            args=[2, 4, 8], kwargs={})
         l.update_strategies()
@@ -344,11 +334,11 @@ class test_Consumer(Case):
         self.assertIsInstance(in_bucket, Request)
         self.assertEqual(in_bucket.name, foo_task.name)
         self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
-        self.assertTrue(self.eta_schedule.empty())
+        self.assertTrue(self.timer.empty())
 
     def test_start_connection_error(self):
 
-        class MockConsumer(MainConsumer):
+        class MockConsumer(BlockingConsumer):
             iterations = 0
 
             def consume_messages(self):
@@ -357,19 +347,19 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                 raise SyntaxError("bar")
 
-        l = MockConsumer(self.ready_queue, self.eta_schedule,
+        l = MockConsumer(self.ready_queue, timer=self.timer,
                              send_events=False, pool=BasePool())
         l.connection_errors = (KeyError, )
         with self.assertRaises(SyntaxError):
             l.start()
         l.heart.stop()
-        l.priority_timer.stop()
+        l.timer.stop()
 
     def test_start_channel_error(self):
         # Regression test for AMQPChannelExceptions that can occur within the
         # consumer. (i.e. 404 errors)
 
-        class MockConsumer(MainConsumer):
+        class MockConsumer(BlockingConsumer):
             iterations = 0
 
             def consume_messages(self):
@@ -378,13 +368,13 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                 raise SyntaxError("bar")
 
-        l = MockConsumer(self.ready_queue, self.eta_schedule,
+        l = MockConsumer(self.ready_queue, timer=self.timer,
                              send_events=False, pool=BasePool())
 
         l.channel_errors = (KeyError, )
         self.assertRaises(SyntaxError, l.start)
         l.heart.stop()
-        l.priority_timer.stop()
+        l.timer.stop()
 
     def test_consume_messages_ignores_socket_timeout(self):
 
@@ -395,8 +385,7 @@ class test_Consumer(Case):
                 self.obj.connection = None
                 raise socket.timeout(10)
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.connection = Connection()
         l.task_consumer = Mock()
         l.connection.obj = l
@@ -412,8 +401,7 @@ class test_Consumer(Case):
                 self.obj.connection = None
                 raise socket.error("foo")
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._state = RUN
         c = l.connection = Connection()
         l.connection.obj = l
@@ -434,8 +422,7 @@ class test_Consumer(Case):
             def drain_events(self, **kwargs):
                 self.obj.connection = None
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.connection = Connection()
         l.connection.obj = l
         l.task_consumer = Mock()
@@ -450,8 +437,7 @@ class test_Consumer(Case):
         l.task_consumer.qos.assert_called_with(prefetch_count=9)
 
     def test_maybe_conn_error(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.connection_errors = (KeyError, )
         l.channel_errors = (SyntaxError, )
         l.maybe_conn_error(Mock(side_effect=AttributeError("foo")))
@@ -462,8 +448,7 @@ class test_Consumer(Case):
 
     def test_apply_eta_task(self):
         from celery.worker import state
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.qos = QoS(None, 10)
 
         task = object()
@@ -474,8 +459,7 @@ class test_Consumer(Case):
         self.assertIs(self.ready_queue.get_nowait(), task)
 
     def test_receieve_message_eta_isoformat(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         m = create_message(Mock(), task=foo_task.name,
                            eta=datetime.now().isoformat(),
                            args=[2, 4, 8], kwargs={})
@@ -486,20 +470,19 @@ class test_Consumer(Case):
         l.enabled = False
         l.update_strategies()
         l.receive_message(m.decode(), m)
-        l.eta_schedule.stop()
+        l.timer.stop()
 
-        items = [entry[2] for entry in self.eta_schedule.queue]
+        items = [entry[2] for entry in self.timer.queue]
         found = 0
         for item in items:
             if item.args[0].name == foo_task.name:
                 found = True
         self.assertTrue(found)
         self.assertTrue(l.task_consumer.qos.call_count)
-        l.eta_schedule.stop()
+        l.timer.stop()
 
     def test_on_control(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                             send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pidbox_node = Mock()
         l.reset_pidbox_node = Mock()
 
@@ -519,8 +502,7 @@ class test_Consumer(Case):
 
     def test_revoke(self):
         ready_queue = FastQueue()
-        l = MyKombuConsumer(ready_queue, self.eta_schedule,
-                           send_events=False)
+        l = MyKombuConsumer(ready_queue, timer=self.timer)
         backend = Mock()
         id = uuid()
         t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
@@ -532,8 +514,7 @@ class test_Consumer(Case):
         self.assertTrue(ready_queue.empty())
 
     def test_receieve_message_not_registered(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         backend = Mock()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
 
@@ -541,13 +522,12 @@ class test_Consumer(Case):
         self.assertFalse(l.receive_message(m.decode(), m))
         with self.assertRaises(Empty):
             self.ready_queue.get_nowait()
-        self.assertTrue(self.eta_schedule.empty())
+        self.assertTrue(self.timer.empty())
 
     @patch("celery.worker.consumer.warn")
     @patch("celery.worker.consumer.logger")
     def test_receieve_message_ack_raises(self, logger, warn):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         backend = Mock()
         m = create_message(backend, args=[2, 4, 8], kwargs={})
 
@@ -559,13 +539,12 @@ class test_Consumer(Case):
         self.assertTrue(warn.call_count)
         with self.assertRaises(Empty):
             self.ready_queue.get_nowait()
-        self.assertTrue(self.eta_schedule.empty())
+        self.assertTrue(self.timer.empty())
         m.reject.assert_called_with()
         self.assertTrue(logger.critical.call_count)
 
     def test_receieve_message_eta(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                            send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.event_dispatcher = Mock()
         l.event_dispatcher._outbound_buffer = deque()
         backend = Mock()
@@ -584,8 +563,8 @@ class test_Consumer(Case):
         l.stop_consumers()
         l.event_dispatcher = Mock()
         l.receive_message(m.decode(), m)
-        l.eta_schedule.stop()
-        in_hold = self.eta_schedule.queue[0]
+        l.timer.stop()
+        in_hold = l.timer.queue[0]
         self.assertEqual(len(in_hold), 3)
         eta, priority, entry = in_hold
         task = entry.args[0]
@@ -596,8 +575,7 @@ class test_Consumer(Case):
             self.ready_queue.get_nowait()
 
     def test_reset_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pidbox_node = Mock()
         chan = l.pidbox_node.channel = Mock()
         l.connection = Mock()
@@ -607,16 +585,14 @@ class test_Consumer(Case):
         chan.close.assert_called_with()
 
     def test_reset_pidbox_node_green(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pool = Mock()
         l.pool.is_green = True
         l.reset_pidbox_node()
         l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
 
     def test__green_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.pidbox_node = Mock()
 
         class BConsumer(Mock):
@@ -673,8 +649,7 @@ class test_Consumer(Case):
     @patch("kombu.connection.BrokerConnection._establish_connection")
     @patch("kombu.utils.sleep")
     def test_open_connection_errback(self, sleep, connect):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                      send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         from kombu.transport.memory import Transport
         Transport.connection_errors = (StdChannelError, )
 
@@ -687,8 +662,7 @@ class test_Consumer(Case):
         connect.assert_called_with()
 
     def test_stop_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
-                      send_events=False)
+        l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l._pidbox_node_stopped = Event()
         l._pidbox_node_shutdown = Event()
         l._pidbox_node_stopped.set()
@@ -711,8 +685,8 @@ class test_Consumer(Case):
                     raise KeyError("foo")
 
         init_callback = Mock()
-        l = _Consumer(self.ready_queue, self.eta_schedule,
-                      send_events=False, init_callback=init_callback)
+        l = _Consumer(self.ready_queue, timer=self.timer,
+                      init_callback=init_callback)
         l.task_consumer = Mock()
         l.broadcast_consumer = Mock()
         l.qos = _QoS()
@@ -734,7 +708,7 @@ class test_Consumer(Case):
         self.assertEqual(l.qos.prev, l.qos.value)
 
         init_callback.reset_mock()
-        l = _Consumer(self.ready_queue, self.eta_schedule,
+        l = _Consumer(self.ready_queue, timer=self.timer,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.task_consumer = Mock()
@@ -747,21 +721,18 @@ class test_Consumer(Case):
         self.assertTrue(l.consume_messages.call_count)
 
     def test_reset_connection_with_no_node(self):
-        l = MainConsumer(self.ready_queue, self.eta_schedule,
-                         send_events=False)
+        l = BlockingConsumer(self.ready_queue, timer=self.timer)
         self.assertEqual(None, l.pool)
         l.reset_connection()
 
     def test_on_task_revoked(self):
-        l = MainConsumer(self.ready_queue, self.eta_schedule,
-                         send_events=False)
+        l = BlockingConsumer(self.ready_queue, timer=self.timer)
         task = Mock()
         task.revoked.return_value = True
         l.on_task(task)
 
     def test_on_task_no_events(self):
-        l = MainConsumer(self.ready_queue, self.eta_schedule,
-                         send_events=False)
+        l = BlockingConsumer(self.ready_queue, timer=self.timer)
         task = Mock()
         task.revoked.return_value = False
         l.event_dispatcher = Mock()
@@ -834,8 +805,8 @@ class test_WorkController(AppCase):
 
     def test_attrs(self):
         worker = self.worker
-        self.assertIsInstance(worker.scheduler, Timer)
-        self.assertTrue(worker.scheduler)
+        self.assertIsInstance(worker.timer, Timer)
+        self.assertTrue(worker.timer)
         self.assertTrue(worker.pool)
         self.assertTrue(worker.consumer)
         self.assertTrue(worker.mediator)
@@ -848,7 +819,7 @@ class test_WorkController(AppCase):
 
     def test_with_autoscaler(self):
         worker = self.create_worker(autoscale=[10, 3], send_events=False,
-                                eta_scheduler_cls="celery.utils.timer2.Timer")
+                                timer_cls="celery.utils.timer2.Timer")
         self.assertTrue(worker.autoscaler)
 
     def test_dont_stop_or_terminate(self):

+ 1 - 1
celery/utils/timer2.py

@@ -83,7 +83,7 @@ class Schedule(object):
 
     on_error = None
 
-    def __init__(self, max_interval=None, on_error=None):
+    def __init__(self, max_interval=None, on_error=None, **kwargs):
         self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
         self.on_error = on_error or self.on_error
         self._queue = []

+ 16 - 14
celery/worker/__init__.py

@@ -89,8 +89,6 @@ class Pool(abstract.StartStopComponent):
         w.no_execv = no_execv
         if w.autoscale:
             w.max_concurrency, w.min_concurrency = w.autoscale
-        w.use_eventloop = (detect_environment() == "default" and
-                           w.app.broker_connection().is_evented)
 
     def create(self, w):
         forking_enable(w.no_execv or not w.force_execv)
@@ -149,24 +147,26 @@ class Timers(abstract.Component):
     requires = ("pool", )
 
     def create(self, w):
+        options = {"on_error": self.on_timer_error,
+                   "on_tick": self.on_timer_tick}
+
         if w.use_eventloop:
-            w.scheduler = w.priority_timer = Schedule(max_interval=10)
+            # the timers are fired by the hub, so don't use the Timer thread.
+            w.timer = Schedule(max_interval=10, **options)
         else:
-            w.priority_timer = self.instantiate(w.pool.Timer)
-            if not w.eta_scheduler_cls:
+            if not w.timer_cls:
                 # Default Timer is set by the pool, as e.g. eventlet
                 # needs a custom implementation.
-                w.eta_scheduler_cls = w.pool.Timer
-            w.scheduler = self.instantiate(w.eta_scheduler_cls,
-                                    max_interval=w.eta_scheduler_precision,
-                                    on_error=self.on_timer_error,
-                                    on_tick=self.on_timer_tick)
+                w.timer_cls = w.pool.Timer
+            w.timer = self.instantiate(w.pool.Timer,
+                                       max_interval=w.timer_precision,
+                                       **options)
 
     def on_timer_error(self, exc):
         logger.error("Timer error: %r", exc, exc_info=True)
 
     def on_timer_tick(self, delay):
-        logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
+        logger.debug("Timer wake-up! Next eta %s secs.", delay)
 
 
 class StateDB(abstract.Component):
@@ -196,8 +196,8 @@ class WorkController(configurated):
     pool_cls = from_config("pool")
     consumer_cls = from_config("consumer")
     mediator_cls = from_config("mediator")
-    eta_scheduler_cls = from_config("eta_scheduler")
-    eta_scheduler_precision = from_config()
+    timer_cls = from_config("timer")
+    timer_precision = from_config("timer_precision")
     autoscaler_cls = from_config("autoscaler")
     autoreloader_cls = from_config("autoreloader")
     schedule_filename = from_config()
@@ -238,6 +238,8 @@ class WorkController(configurated):
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self.pidfile = pidfile
         self.pidlock = None
+        self.use_eventloop = (detect_environment() == "default" and
+                              self.app.broker_connection().is_evented)
 
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
@@ -323,7 +325,7 @@ class WorkController(configurated):
                 stop = getattr(component, "terminate", None) or stop
             stop()
 
-        self.priority_timer.stop()
+        self.timer.stop()
         self.consumer.close_connection()
 
         if self.pidlock:

+ 58 - 60
celery/worker/consumer.py

@@ -46,7 +46,7 @@ up and running.
   are acknowledged immediately and logged, so the message is not resent
   again, and again.
 
-* If the task has an ETA/countdown, the task is moved to the `eta_schedule`
+* If the task has an ETA/countdown, the task is moved to the `timer`
   so the :class:`timer2.Timer` can schedule it at its
   deadline. Tasks without an eta are moved immediately to the `ready_queue`,
   so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
@@ -157,19 +157,22 @@ class Component(StartStopComponent):
     name = "worker.consumer"
     last = True
 
+    def Consumer(self, w):
+        return (w.consumer_cls or
+                Consumer if w.use_eventloop else BlockingConsumer)
+
     def create(self, w):
         prefetch_count = w.concurrency * w.prefetch_multiplier
-        c = w.consumer = self.instantiate(
-                w.consumer_cls, w.ready_queue, w.scheduler,
+        c = w.consumer = self.instantiate(self.Consumer(w),
+                w.ready_queue,
                 hostname=w.hostname,
                 send_events=w.send_events,
                 init_callback=w.ready_callback,
                 initial_prefetch_count=prefetch_count,
                 pool=w.pool,
-                priority_timer=w.priority_timer,
+                timer=w.timer,
                 app=w.app,
-                controller=w,
-                use_eventloop=w.use_eventloop)
+                controller=w)
         return c
 
 
@@ -244,16 +247,13 @@ class Consumer(object):
     move them to the ready queue for task processing.
 
     :param ready_queue: See :attr:`ready_queue`.
-    :param eta_schedule: See :attr:`eta_schedule`.
+    :param timer: See :attr:`timer`.
 
     """
 
     #: The queue that holds tasks ready for immediate processing.
     ready_queue = None
 
-    #: Timer for tasks with an ETA/countdown.
-    eta_schedule = None
-
     #: Enable/disable events.
     send_events = False
 
@@ -295,27 +295,21 @@ class Consumer(object):
 
     #: A timer used for high-priority internal tasks, such
     #: as sending heartbeats.
-    priority_timer = None
+    timer = None
 
     # Consumer state, can be RUN or CLOSE.
     _state = None
 
-    #: If true then pool results and broker messages will be
-    #: handled in an event loop.
-    use_eventloop = False
-
-    def __init__(self, ready_queue, eta_schedule,
+    def __init__(self, ready_queue,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
-            priority_timer=None, controller=None, use_eventloop=False,
-            **kwargs):
+            timer=None, controller=None, **kwargs):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
         self.controller = controller
         self.broadcast_consumer = None
         self.ready_queue = ready_queue
-        self.eta_schedule = eta_schedule
         self.send_events = send_events
         self.init_callback = init_callback
         self.hostname = hostname or socket.gethostname()
@@ -323,8 +317,7 @@ class Consumer(object):
         self.event_dispatcher = None
         self.heart = None
         self.pool = pool
-        self.priority_timer = priority_timer or timer2.default_timer
-        self.use_eventloop = use_eventloop
+        self.timer = timer or timer2.default_timer
         pidbox_state = AttributeDict(app=self.app,
                                      hostname=self.hostname,
                                      listener=self,     # pre 2.2
@@ -362,56 +355,40 @@ class Consumer(object):
             except self.connection_errors + self.channel_errors:
                 error(RETRY_CONNECTION, exc_info=True)
 
-    def consume_messages(self):
+    def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
+        """Consume messages forever (or until an exception is raised)."""
         self.task_consumer.consume()
         debug("Ready to accept tasks!")
 
-        # evented version
-        if self.use_eventloop:
-            return self._eventloop()
-
-        while self._state != CLOSE and self.connection:
-            if state.should_stop:
-                raise SystemExit()
-            elif state.should_terminate:
-                raise SystemTerminate()
-            if self.qos.prev != self.qos.value:     # pragma: no cover
-                self.qos.update()
-            try:
-                self.connection.drain_events(timeout=1.0)
-            except socket.timeout:
-                pass
-            except socket.error:
-                if self._state != CLOSE:            # pragma: no cover
-                    raise
-
-    def _eventloop(self):
-        """Consume messages forever (or until an exception is raised)."""
-        on_poll_start = self.connection.transport.on_poll_start
-
-        qos = self.qos
-        with Hub(self.priority_timer) as hub:
-            update = hub.update
+        with Hub(self.timer) as hub:
+            qos = self.qos
+            update_qos = qos.update
+            update_fds = hub.update
             fdmap = hub.fdmap
             poll = hub.poller.poll
             fire_timers = hub.fire_timers
-            scheduled = hub.schedule._queue
-            update(self.connection.eventmap,
-                       self.pool.eventmap)
-            self.connection.transport.on_poll_init(hub.poller)
+            scheduled = hub.timer._queue
+            transport = self.connection.transport
+            on_poll_start = transport.on_poll_start
+
+            update_fds(self.connection.eventmap, self.pool.eventmap)
+            transport.on_poll_init(hub.poller)
 
             while self._state != CLOSE and self.connection:
+                # shutdown if signal handlers told us to.
                 if state.should_stop:
                     raise SystemExit()
                 elif state.should_terminate:
                     raise SystemTerminate()
 
+                # fire any ready timers, this also determines
+                # when we need to wake up next.
                 time_to_sleep = fire_timers() if scheduled else 1
 
-                if qos.prev != qos.value:     # pragma: no cover
-                    qos.update()
+                if qos.prev != qos.value:
+                    update_qos()
 
-                update(on_poll_start())
+                update_fds(on_poll_start())
                 if fdmap:
                     for fileno, event in poll(time_to_sleep) or ():
                         try:
@@ -455,9 +432,8 @@ class Consumer(object):
                 task.acknowledge()
             else:
                 self.qos.increment()
-                self.eta_schedule.apply_at(eta,
-                                           self.apply_eta_task, (task, ),
-                                           priority=6)
+                self.timer.apply_at(eta, self.apply_eta_task, (task, ),
+                                    priority=6)
         else:
             state.task_reserved(task)
             self.ready_queue.put(task)
@@ -643,7 +619,7 @@ class Consumer(object):
         # They can't be acked anyway, as a delivery tag is specific
         # to the current channel.
         self.ready_queue.clear()
-        self.eta_schedule.clear()
+        self.timer.clear()
 
         # Re-establish the broker connection and setup the task consumer.
         self.connection = self._open_connection()
@@ -685,7 +661,7 @@ class Consumer(object):
         can tell if the worker is off-line/missing.
 
         """
-        self.heart = Heart(self.priority_timer, self.event_dispatcher)
+        self.heart = Heart(self.timer, self.event_dispatcher)
         self.heart.start()
 
     def _open_connection(self):
@@ -743,3 +719,25 @@ class Consumer(object):
             conninfo = self.connection.info()
             conninfo.pop("password", None)  # don't send password.
         return {"broker": conninfo, "prefetch_count": self.qos.value}
+
+
+class BlockingConsumer(Consumer):
+
+    def consume_messages(self):
+        self.task_consumer.consume()
+        debug("Ready to accept tasks!")
+
+        while self._state != CLOSE and self.connection:
+            if state.should_stop:
+                raise SystemExit()
+            elif state.should_terminate:
+                raise SystemTerminate()
+            if self.qos.prev != self.qos.value:     # pragma: no cover
+                self.qos.update()
+            try:
+                self.connection.drain_events(timeout=10.0)
+            except socket.timeout:
+                pass
+            except socket.error:
+                if self._state != CLOSE:            # pragma: no cover
+                    raise

+ 1 - 1
celery/worker/control.py

@@ -144,7 +144,7 @@ def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
 
 @Panel.register
 def dump_schedule(panel, safe=False, **kwargs):
-    schedule = panel.consumer.eta_schedule.schedule
+    schedule = panel.consumer.timer.schedule
     if not schedule.queue:
         logger.info("--Empty schedule--")
         return []

+ 4 - 4
celery/worker/hub.py

@@ -9,10 +9,10 @@ from celery.utils.timer2 import Schedule
 class Hub(object):
     eventflags = POLL_READ | POLL_ERR
 
-    def __init__(self, schedule=None):
+    def __init__(self, timer=None):
         self.fdmap = {}
         self.poller = poll()
-        self.schedule = Schedule() if schedule is None else schedule
+        self.timer = Schedule() if timer is None else timer
 
     def __enter__(self):
         return self
@@ -25,7 +25,7 @@ class Hub(object):
             delay, entry = self.scheduler.next()
             if entry is None:
                 break
-            self.schedule.apply_entry(entry)
+            self.timer.apply_entry(entry)
         return min(max(delay, min_delay), max_delay)
 
     def add(self, fd, callback, flags=None):
@@ -51,4 +51,4 @@ class Hub(object):
 
     @cached_property
     def scheduler(self):
-        return iter(self.schedule)
+        return iter(self.timer)

+ 5 - 5
docs/configuration.rst

@@ -1107,10 +1107,10 @@ Can also be set via the :option:`--statedb` argument to
 
 Not enabled by default.
 
-.. setting:: CELERYD_ETA_SCHEDULER_PRECISION
+.. setting:: CELERYD_TIMER_PRECISION
 
-CELERYD_ETA_SCHEDULER_PRECISION
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+CELERYD_TIMER_PRECISION
+~~~~~~~~~~~~~~~~~~~~~~~
 
 Set the maximum time in seconds that the ETA scheduler can sleep between
 rechecking the schedule.  Default is 1 second.
@@ -1479,9 +1479,9 @@ CELERYD_MEDIATOR
 Name of the mediator class used by the worker.
 Default is :class:`celery.worker.controllers.Mediator`.
 
-.. setting:: CELERYD_ETA_SCHEDULER
+.. setting:: CELERYD_TIMER
 
-CELERYD_ETA_SCHEDULER
+CELERYD_TIMER
 ~~~~~~~~~~~~~~~~~~~~~
 
 Name of the ETA scheduler class used by the worker.