autoreload.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoreload
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements automatic module reloading
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import errno
  10. import hashlib
  11. import os
  12. import select
  13. import sys
  14. import time
  15. from collections import defaultdict
  16. from ..abstract import StartStopComponent
  17. from ..utils.threads import bgThread, Event
  18. try:
  19. import pyinotify
  20. _ProcessEvent = pyinotify.ProcessEvent
  21. except ImportError:
  22. pyinotify = None # noqa
  23. _ProcessEvent = object # noqa
  24. class WorkerComponent(StartStopComponent):
  25. name = "worker.autoreloader"
  26. requires = ("pool", )
  27. def __init__(self, w, autoreload=None, **kwargs):
  28. self.enabled = w.autoreload = autoreload
  29. w.autoreloader = None
  30. def create(self, w):
  31. w.autoreloader = self.instantiate(w.autoreloader_cls,
  32. controller=w,
  33. modules=w.autoreload,
  34. logger=w.logger)
  35. return w.autoreloader
  36. def file_hash(filename, algorithm="md5"):
  37. hobj = hashlib.new(algorithm)
  38. with open(filename, "rb") as f:
  39. for chunk in iter(lambda: f.read(2 ** 20), ''):
  40. hobj.update(chunk)
  41. return hobj.digest()
  42. class BaseMonitor(object):
  43. def __init__(self, files, on_change=None, shutdown_event=None,
  44. interval=0.5):
  45. self.files = files
  46. self.interval = interval
  47. self._on_change = on_change
  48. self.modify_times = defaultdict(int)
  49. self.shutdown_event = shutdown_event or Event()
  50. def start(self):
  51. raise NotImplementedError("Subclass responsibility")
  52. def stop(self):
  53. pass
  54. def on_change(self, modified):
  55. if self._on_change:
  56. return self._on_change(modified)
  57. class StatMonitor(BaseMonitor):
  58. """File change monitor based on the ``stat`` system call."""
  59. def _mtimes(self):
  60. return ((f, self._mtime(f)) for f in self.files)
  61. def _maybe_modified(self, f, mt):
  62. return mt is not None and self.modify_times[f] != mt
  63. def start(self):
  64. while not self.shutdown_event.is_set():
  65. modified = dict((f, mt) for f, mt in self._mtimes()
  66. if self._maybe_modified(f, mt))
  67. if modified:
  68. self.on_change(modified.keys())
  69. self.modify_times.update(modified)
  70. time.sleep(self.interval)
  71. @staticmethod
  72. def _mtime(path):
  73. try:
  74. return os.stat(path).st_mtime
  75. except Exception:
  76. pass
  77. class KQueueMonitor(BaseMonitor):
  78. """File change monitor based on BSD kernel event notifications"""
  79. def __init__(self, *args, **kwargs):
  80. assert hasattr(select, "kqueue")
  81. super(KQueueMonitor, self).__init__(*args, **kwargs)
  82. self.filemap = dict((f, None) for f in self.files)
  83. def start(self):
  84. self._kq = select.kqueue()
  85. kevents = []
  86. for f in self.filemap:
  87. self.filemap[f] = fd = os.open(f, os.O_RDONLY)
  88. ev = select.kevent(fd,
  89. filter=select.KQ_FILTER_VNODE,
  90. flags=select.KQ_EV_ADD |
  91. select.KQ_EV_ENABLE |
  92. select.KQ_EV_CLEAR,
  93. fflags=select.KQ_NOTE_WRITE |
  94. select.KQ_NOTE_EXTEND)
  95. kevents.append(ev)
  96. events = self._kq.control(kevents, 0)
  97. while not self.shutdown_event.is_set():
  98. events = self._kq.control(kevents, 1)
  99. fds = [e.ident for e in events]
  100. modified = [k for k, v in self.filemap.iteritems()
  101. if v in fds]
  102. self.on_change(modified)
  103. def stop(self):
  104. self._kq.close()
  105. for fd in filter(None, self.filemap.values()):
  106. try:
  107. os.close(fd)
  108. except OSError, exc:
  109. if exc != errno.EBADF:
  110. raise
  111. self.filemap[fd] = None
  112. self.filemap.clear()
  113. class InotifyMonitor(_ProcessEvent):
  114. """File change monitor based on Linux kernel `inotify` subsystem"""
  115. def __init__(self, modules, on_change=None, **kwargs):
  116. assert pyinotify
  117. self._modules = modules
  118. self._on_change = on_change
  119. def start(self):
  120. try:
  121. self._wm = pyinotify.WatchManager()
  122. self._notifier = pyinotify.Notifier(self._wm)
  123. for m in self._modules:
  124. self._wm.add_watch(m, pyinotify.IN_MODIFY)
  125. self._notifier.loop()
  126. finally:
  127. self.close()
  128. def close(self):
  129. self._notifier.stop()
  130. self._wm.close()
  131. def process_IN_MODIFY(self, event):
  132. self.on_change(event.pathname)
  133. def on_change(self, modified):
  134. if self._on_change:
  135. return self._on_change(modified)
  136. # kqueue monitor not working properly at this time.
  137. #if hasattr(select, "kqueue"):
  138. # Monitor = KQueueMonitor
  139. if sys.platform.startswith("linux") and pyinotify:
  140. Monitor = InotifyMonitor
  141. else:
  142. Monitor = StatMonitor
  143. class Autoreloader(bgThread):
  144. """Tracks changes in modules and fires reload commands"""
  145. Monitor = Monitor
  146. def __init__(self, controller, modules=None, monitor_cls=None,
  147. logger=None, **options):
  148. super(Autoreloader, self).__init__()
  149. self.controller = controller
  150. app = self.controller.app
  151. self.modules = app.loader.task_modules if modules is None else modules
  152. self.logger = logger
  153. self.options = options
  154. self.Monitor = monitor_cls or self.Monitor
  155. def body(self):
  156. files = [sys.modules[m].__file__ for m in self.modules]
  157. self._monitor = self.Monitor(files, self.on_change,
  158. shutdown_event=self._is_shutdown, **self.options)
  159. self._hashes = dict([(f, file_hash(f)) for f in files])
  160. try:
  161. self._monitor.start()
  162. except OSError, exc:
  163. if exc.errno not in (errno.EINTR, errno.EAGAIN):
  164. raise
  165. def _maybe_modified(self, f):
  166. digest = file_hash(f)
  167. if digest != self._hashes[f]:
  168. self._hashes[f] = digest
  169. return True
  170. return False
  171. def on_change(self, files):
  172. modified = [f for f in files if self._maybe_modified(f)]
  173. if modified:
  174. self.logger.info("Detected modified modules: %s" % (
  175. map(self._module_name, modified), ))
  176. self._reload(map(self._module_name, modified))
  177. def _reload(self, modules):
  178. self.controller.reload(modules, reload=True)
  179. def stop(self):
  180. self._monitor.stop()
  181. @staticmethod
  182. def _module_name(path):
  183. return os.path.splitext(os.path.basename(path))[0]