Ask Solem 13 år sedan
förälder
incheckning
cc10019b3d
2 ändrade filer med 93 tillägg och 82 borttagningar
  1. 1 1
      celery/app/defaults.py
  2. 92 81
      celery/worker/autoreload.py

+ 1 - 1
celery/app/defaults.py

@@ -151,7 +151,7 @@ NAMESPACES = {
     },
     "CELERYD": {
         "AUTOSCALER": Option("celery.worker.autoscale.Autoscaler"),
-        "AUTORELOADER": Option("celery.worker.autoreload.AutoReloader"),
+        "AUTORELOADER": Option("celery.worker.autoreload.Autoreloader"),
         "BOOT_STEPS": Option((), type="tuple"),
         "CONCURRENCY": Option(0, type="int"),
         "ETA_SCHEDULER": Option(None, type="string"),

+ 92 - 81
celery/worker/autoreload.py

@@ -13,20 +13,20 @@ import sys
 import time
 import select
 import hashlib
-import threading
 
 from collections import defaultdict
 
 from .. import current_app
 from ..abstract import StartStopComponent
+from ..utils.threads import bgThread, Event
 
 
 class WorkerComponent(StartStopComponent):
     name = "worker.autoreloader"
     requires = ("pool", )
 
-    def __init__(self, w, **kwargs):
-        self.enabled = w.autoreload
+    def __init__(self, w, autoreload=None, **kwargs):
+        self.enabled = w.autoreload = autoreload
         w.autoreloader = None
 
     def create(self, w):
@@ -36,104 +36,112 @@ class WorkerComponent(StartStopComponent):
         return w.autoreloader
 
 
-def file_hash(filename, algorithm='md5'):
+def file_hash(filename, algorithm="md5"):
     hobj = hashlib.new(algorithm)
-    with open(filename, 'rb') as f:
+    with open(filename, "rb") as f:
         for chunk in iter(lambda: f.read(2 ** 20), ''):
             hobj.update(chunk)
     return hobj.digest()
 
 
-class StatMonitor(object):
-    """File change monitor based on `stat` system call"""
-    def __init__(self, files, on_change=None, interval=0.5):
-        self._files = files
-        self._interval = interval
+class BaseMonitor(object):
+
+    def __init__(self, files, on_change=None, shutdown_event=None,
+            interval=0.5):
+        self.files = files
+        self.interval = interval
         self._on_change = on_change
-        self._modify_times = defaultdict(int)
+        self.modify_times = defaultdict(int)
+        self.shutdown_event = shutdown_event or Event()
 
     def start(self):
-        while True:
+        raise NotImplementedError("Subclass responsibility")
+
+    def stop(self):
+        pass
+
+    def on_change(self, modified):
+        if self._on_change:
+            return self._on_change(modified)
+
+
+class StatMonitor(BaseMonitor):
+    """File change monitor based on the ``stat`` system call."""
+
+    def start(self):
+        while not self.shutdown_event.is_set():
             modified = {}
-            for m in self._files:
+            for m in self.files:
                 mt = self._mtime(m)
                 if mt is None:
                     break
-                if self._modify_times[m] != mt:
+                if self.modify_times[m] != mt:
                     modified[m] = mt
             else:
                 if modified:
                     self.on_change(modified.keys())
-                    self._modify_times.update(modified)
-
-            time.sleep(self._interval)
-
-    def on_change(self, modified):
-        if self._on_change:
-            return self._on_change(modified)
+                    self.modify_times.update(modified)
+            time.sleep(self.interval)
 
-    @classmethod
-    def _mtime(cls, path):
+    @staticmethod
+    def _mtime(path):
         try:
             return os.stat(path).st_mtime
-        except:
-            return
-
+        except Exception:
+            pass
 
-class KQueueMonitor(object):
+class KQueueMonitor(BaseMonitor):
     """File change monitor based on BSD kernel event notifications"""
-    def __init__(self, files, on_change=None):
-        assert hasattr(select, 'kqueue')
-        self._files = dict([(f, None) for f in files])
-        self._on_change = on_change
+
+    def __init__(self, *args, **kwargs):
+        assert hasattr(select, "kqueue")
+        super(KQueueMonitor, self).__init__(*args, **kwargs)
+        self.filemap = dict((f, None) for f in self.files)
 
     def start(self):
-        try:
-            self._kq = select.kqueue()
-            kevents = []
-            for f in self._files:
-                self._files[f] = fd = os.open(f, os.O_RDONLY)
-
-                ev = select.kevent(fd,
-                        filter=select.KQ_FILTER_VNODE,
-                        flags=select.KQ_EV_ADD |
-                              select.KQ_EV_ENABLE |
-                              select.KQ_EV_CLEAR,
-                        fflags=select.KQ_NOTE_WRITE |
-                               select.KQ_NOTE_EXTEND)
-                kevents.append(ev)
-
-            events = self._kq.control(kevents, 0)
-            while True:
-                events = self._kq.control(kevents, 1)
-                fds = [e.ident for e in events]
-                modified = [k for k, v in self._files.iteritems()
-                                            if v in fds]
-                self.on_change(modified)
-        finally:
-            self.close()
+        self._kq = select.kqueue()
+        kevents = []
+        for f in self.filemap:
+            self.filemap[f] = fd = os.open(f, os.O_RDONLY)
+
+            ev = select.kevent(fd,
+                    filter=select.KQ_FILTER_VNODE,
+                    flags=select.KQ_EV_ADD |
+                            select.KQ_EV_ENABLE |
+                            select.KQ_EV_CLEAR,
+                    fflags=select.KQ_NOTE_WRITE |
+                            select.KQ_NOTE_EXTEND)
+            kevents.append(ev)
+
+        events = self._kq.control(kevents, 0)
+        while not self.shutdown_event.is_set():
+            events = self._kq.control(kevents, 1)
+            fds = [e.ident for e in events]
+            modified = [k for k, v in self.filemap.iteritems()
+                                        if v in fds]
+            self.on_change(modified)
 
-    def close(self):
+    def stop(self):
         self._kq.close()
         for f in self._files:
             if self._files[f] is not None:
                 os.close(self._files[f])
                 self._files[f] = None
 
-    def on_change(self, modified):
-        if self._on_change:
-            return self._on_change(modified)
 
 
 try:
     import pyinotify
+    _ProcessEvent = pyinotify.ProcessEvent
 except ImportError:
-    pyinotify = None    # noqa
+    pyinotify = None        # noqa
+    _ProcessEvent = object  # noqa
 
 
-class InotifyMonitor(pyinotify and pyinotify.ProcessEvent or object):
+class InotifyMonitor(_ProcessEvent):
     """File change monitor based on  Linux kernel `inotify` subsystem"""
-    def __init__(self, modules, on_change=None):
+
+    def __init__(self, modules, on_change=None, **kwargs):
         assert pyinotify
         self._modules = modules
         self._on_change = on_change
@@ -157,29 +165,32 @@ class InotifyMonitor(pyinotify and pyinotify.ProcessEvent or object):
 
     def on_change(self, modified):
         if self._on_change:
-            self._on_change(modified)
+            return self._on_change(modified)
 
 
-if hasattr(select, 'kqueue'):
-    _monitor_cls = KQueueMonitor
-elif sys.platform.startswith('linux') and pyinotify:
-    _monitor_cls = InotifyMonitor
+if hasattr(select, "kqueue"):
+    Monitor = KQueueMonitor
+elif sys.platform.startswith("linux") and pyinotify:
+    Monitor = InotifyMonitor
 else:
-    _monitor_cls = StatMonitor
+    Monitor = StatMonitor
 
 
-class AutoReloader(threading.Thread):
+class Autoreloader(bgThread):
     """Tracks changes in modules and fires reload commands"""
-    def __init__(self, modules, monitor_cls=_monitor_cls, logger=None,
-                 *args, **kwargs):
-        super(AutoReloader, self).__init__()
+    Monitor = Monitor
+
+    def __init__(self, modules, monitor_cls=None, logger=None, **kwargs):
+        super(Autoreloader, self).__init__()
         self.daemon = True
         self.logger = logger
         files = [sys.modules[m].__file__ for m in modules]
-        self._monitor = monitor_cls(files, self.on_change, *args, **kwargs)
+        self.Monitor = monitor_cls or self.Monitor
+        self._monitor = self.Monitor(files, self.on_change,
+                shutdown_event=self._is_shutdown, **kwargs)
         self._hashes = dict([(f, file_hash(f)) for f in files])
 
-    def start(self):
+    def body(self):
         self._monitor.start()
 
     def on_change(self, files):
@@ -190,18 +201,18 @@ class AutoReloader(threading.Thread):
                 modified.append(f)
                 self._hashes[f] = fhash
         if modified:
-            self.logger.debug("Detected modified modules: %s" %\
-                    map(self._module_name, modified))
+            self.logger.debug("Detected modified modules: %s" % (
+                    map(self._module_name, modified), ))
             self._reload(map(self._module_name, modified))
 
     def _reload(self, modules):
         current_app.control.broadcast("pool_restart",
                 arguments={"imports": modules, "reload_modules": True})
 
-    @classmethod
-    def _module_name(cls, path):
+    def stop(self):
+        self._monitor.stop()
+
+    @staticmethod
+    def _module_name(path):
         return os.path.splitext(os.path.basename(path))[0]
 
-    def stop(self):
-        if hasattr(self._monitor, 'stop'):
-            self._monitor.stop()