Browse Source

Don't use timer threads if evented, simply plug them into the hub

Ask Solem 13 years ago
parent
commit
3a53c99255
3 changed files with 49 additions and 39 deletions
  1. 13 9
      celery/worker/__init__.py
  2. 21 12
      celery/worker/consumer.py
  3. 15 18
      celery/worker/hub.py

+ 13 - 9
celery/worker/__init__.py

@@ -33,6 +33,7 @@ from celery.utils.functional import noop
 from celery.utils.imports import qualname, reload_from_cwd
 from celery.utils.log import get_logger
 from celery.utils.threads import Event
+from celery.utils.timer2 import Schedule
 
 from . import abstract
 from . import state
@@ -148,15 +149,18 @@ class Timers(abstract.Component):
     requires = ("pool", )
 
     def create(self, w):
-        w.priority_timer = self.instantiate(w.pool.Timer)
-        if not w.eta_scheduler_cls:
-            # Default Timer is set by the pool, as e.g. eventlet
-            # needs a custom implementation.
-            w.eta_scheduler_cls = w.pool.Timer
-        w.scheduler = self.instantiate(w.eta_scheduler_cls,
-                                precision=w.eta_scheduler_precision,
-                                on_error=self.on_timer_error,
-                                on_tick=self.on_timer_tick)
+        if w.use_eventloop:
+            w.scheduler = w.priority_timer = Schedule(max_interval=10)
+        else:
+            w.priority_timer = self.instantiate(w.pool.Timer)
+            if not w.eta_scheduler_cls:
+                # Default Timer is set by the pool, as e.g. eventlet
+                # needs a custom implementation.
+                w.eta_scheduler_cls = w.pool.Timer
+            w.scheduler = self.instantiate(w.eta_scheduler_cls,
+                                    max_interval=w.eta_scheduler_precision,
+                                    on_error=self.on_timer_error,
+                                    on_tick=self.on_timer_tick)
 
     def on_timer_error(self, exc):
         logger.error("Timer error: %r", exc, exc_info=True)

+ 21 - 12
celery/worker/consumer.py

@@ -80,6 +80,7 @@ import logging
 import socket
 import threading
 
+from time import sleep
 from Queue import Empty
 
 from kombu.utils.encoding import safe_repr
@@ -389,10 +390,12 @@ class Consumer(object):
         on_poll_start = self.connection.transport.on_poll_start
 
         qos = self.qos
-        with Hub() as hub:
+        with Hub(self.priority_timer) as hub:
             update = hub.update
             fdmap = hub.fdmap
             poll = hub.poller.poll
+            fire_timers = hub.fire_timers
+            scheduled = hub.schedule._queue
             update(self.connection.eventmap,
                        self.pool.eventmap)
             self.connection.transport.on_poll_init(hub.poller)
@@ -402,20 +405,25 @@ class Consumer(object):
                     raise SystemExit()
                 elif state.should_terminate:
                     raise SystemTerminate()
-                if not fdmap:
-                    return
+
+                time_to_sleep = fire_timers() if scheduled else 1
+
                 if qos.prev != qos.value:     # pragma: no cover
                     qos.update()
 
                 update(on_poll_start())
-                for fileno, event in poll(100.0) or ():
-                    try:
-                        fdmap[fileno](fileno, event)
-                    except Empty:
-                        pass
-                    except socket.error:
-                        if self._state != CLOSE:        # pragma: no cover
-                            raise
+                if fdmap:
+                    for fileno, event in poll(time_to_sleep) or ():
+                        try:
+                            fdmap[fileno](fileno, event)
+                        except Empty:
+                            pass
+                        except socket.error:
+                            if self._state != CLOSE:        # pragma: no cover
+                                raise
+                else:
+                    sleep(min(time_to_sleep, 1))
+
 
     def on_task(self, task):
         """Handle received task.
@@ -449,7 +457,8 @@ class Consumer(object):
             else:
                 self.qos.increment()
                 self.eta_schedule.apply_at(eta,
-                                           self.apply_eta_task, (task, ))
+                                           self.apply_eta_task, (task, ),
+                                           priority=6)
         else:
             state.task_reserved(task)
             self.ready_queue.put(task)

+ 15 - 18
celery/worker/hub.py

@@ -1,19 +1,18 @@
 from __future__ import absolute_import
 
-import errno
-import socket
-
-from time import sleep
-
+from kombu.utils import cached_property
 from kombu.utils.eventio import poll, POLL_READ, POLL_ERR
 
+from celery.utils.timer2 import Schedule
+
 
 class Hub(object):
     eventflags = POLL_READ | POLL_ERR
 
-    def __init__(self):
+    def __init__(self, schedule=None):
         self.fdmap = {}
         self.poller = poll()
+        self.schedule = Schedule() if schedule is None else schedule
 
     def __enter__(self):
         return self
@@ -21,6 +20,12 @@ class Hub(object):
     def __exit__(self, *exc_info):
         return self.close()
 
+    def fire_timers(self, min_delay=10, max_delay=10):
+        delay, entry = self.scheduler.next()
+        if entry is not None:
+            self.schedule.apply_entry(entry)
+        return min(max(delay, min_delay), max_delay)
+
     def add(self, fd, callback, flags=None):
         flags = self.eventflags if flags is None else flags
         self.poller.register(fd, flags)
@@ -39,17 +44,9 @@ class Hub(object):
         except (KeyError, OSError):
             pass
 
-    def tick(self, timeout=1.0):
-        if not self.fdmap:
-            return sleep(0.1)
-        for fileno, event in self.poller.poll(timeout) or ():
-            try:
-                self.fdmap[fileno](fileno, event)
-            except socket.timeout:
-                pass
-            except socket.error, exc:
-                if exc.errno != errno.EAGAIN:
-                    raise
-
     def close(self):
         [self.remove(fd) for fd in self.fdmap.keys()]
+
+    @cached_property
+    def scheduler(self):
+        return iter(self.schedule)