Browse Source

Autoreloader now binds to eventloop if kqueue used

Ask Solem 13 years ago
parent
commit
606ba93fb2
1 changed files with 56 additions and 28 deletions
  1. 56 28
      celery/worker/autoreload.py

+ 56 - 28
celery/worker/autoreload.py

@@ -17,6 +17,8 @@ import time
 
 from collections import defaultdict
 
+from kombu.utils import eventio
+
 from celery.platforms import ignore_EBADF
 from celery.utils.imports import module_file
 from celery.utils.log import get_logger
@@ -42,11 +44,20 @@ class WorkerComponent(StartStopComponent):
         self.enabled = w.autoreload = autoreload
         w.autoreloader = None
 
-    def create(self, w):
-        w.autoreloader = self.instantiate(w.autoreloader_cls,
-                                          controller=w)
+    def create_ev(self, w):
+        ar = w.autoreloader = self.instantiate(w.autoreloader_cls, w)
+        w.hub.on_init.append(ar.on_poll_init)
+        w.hub.on_close.append(ar.on_poll_close)
+
+    def create_threaded(self, w):
+        w.autoreloader = self.instantiate(w.autoreloader_cls, w)
         return w.autoreloader
 
+    def create(self, w):
+        if hasattr(select, "kqueue") and w.use_eventloop:
+            return self.create_ev(w)
+        return self.create_threaded(w)
+
 
 def file_hash(filename, algorithm="md5"):
     hobj = hashlib.new(algorithm)
@@ -107,40 +118,46 @@ class KQueueMonitor(BaseMonitor):
     """File change monitor based on BSD kernel event notifications"""
 
     def __init__(self, *args, **kwargs):
-        assert hasattr(select, "kqueue")
         super(KQueueMonitor, self).__init__(*args, **kwargs)
         self.filemap = dict((f, None) for f in self.files)
+        self.fdmap = {}
 
-    def start(self):
-        self._kq = select.kqueue()
-        kevents = []
+    def on_poll_init(self, hub):
+        self.add_events(hub.poller)
+        hub.poller.on_file_change = self.handle_event
+
+    def on_poll_close(self, hub):
+        self.close(hub.poller)
+
+    def add_events(self, poller):
         for f in self.filemap:
             self.filemap[f] = fd = os.open(f, os.O_RDONLY)
+            self.fdmap[fd] = f
+            poller.watch_file(fd)
 
-            ev = select.kevent(fd,
-                    filter=select.KQ_FILTER_VNODE,
-                    flags=select.KQ_EV_ADD |
-                            select.KQ_EV_ENABLE |
-                            select.KQ_EV_CLEAR,
-                    fflags=select.KQ_NOTE_WRITE |
-                            select.KQ_NOTE_EXTEND)
-            kevents.append(ev)
+    def handle_event(self, events):
+        self.on_change([self.fdmap[e.ident] for e in events])
 
-        events = self._kq.control(kevents, 0)
+    def start(self):
+        self.poller = eventio.poll()
+        self.add_events(self.poller._kcontrol)
+        self.poller.on_file_change = self.handle_event
         while not self.shutdown_event.is_set():
-            events = self._kq.control(kevents, 1)
-            fds = [e.ident for e in events]
-            modified = [k for k, v in self.filemap.iteritems()
-                                        if v in fds]
-            self.on_change(modified)
+            self.poller.poll(1)
 
-    def stop(self):
-        self._kq.close()
-        for fd in filter(None, self.filemap.values()):
+    def close(self, poller):
+        for f, fd in filter(None, self.filemap.iteritems()):
+            poller.unregister(fd)
             with ignore_EBADF():  # pragma: no cover
                 os.close(fd)
-            self.filemap[fd] = None
+            self.filemap.pop(f, None)
+            self.fdmap.pop(fd, None)
         self.filemap.clear()
+        self.fdmap.clear()
+
+    def stop(self):
+        self._kq.close()
+        self.close(self.poller)
 
 
 class InotifyMonitor(_ProcessEvent):
@@ -207,15 +224,26 @@ class Autoreloader(bgThread):
         app = self.controller.app
         self.modules = app.loader.task_modules if modules is None else modules
         self.options = options
-        self.Monitor = monitor_cls or self.Monitor
         self._monitor = None
         self._hashes = None
 
-    def body(self):
+    def on_init(self):
         files = [module_file(sys.modules[m]) for m in self.modules]
+        self._hashes = dict((f, file_hash(f)) for f in files)
         self._monitor = self.Monitor(files, self.on_change,
                 shutdown_event=self._is_shutdown, **self.options)
-        self._hashes = dict([(f, file_hash(f)) for f in files])
+
+    def on_poll_init(self, hub):
+        if self._monitor is None:
+            self.on_init()
+        self._monitor.on_poll_init(hub)
+
+    def on_poll_close(self, hub):
+        if self._monitor is not None:
+            self._monitor.on_poll_close(hub)
+
+    def body(self):
+        self.on_init()
         try:
             self._monitor.start()
         except OSError, exc: