autoreload.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoreload
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements automatic module reloading
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import errno
  10. import hashlib
  11. import os
  12. import select
  13. import sys
  14. import time
  15. from collections import defaultdict
  16. from ..abstract import StartStopComponent
  17. from ..utils.threads import bgThread, Event
  18. try:
  19. import pyinotify
  20. _ProcessEvent = pyinotify.ProcessEvent
  21. except ImportError:
  22. pyinotify = None # noqa
  23. _ProcessEvent = object # noqa
  24. class WorkerComponent(StartStopComponent):
  25. name = "worker.autoreloader"
  26. requires = ("pool", )
  27. def __init__(self, w, autoreload=None, **kwargs):
  28. self.enabled = w.autoreload = autoreload
  29. w.autoreloader = None
  30. def create(self, w):
  31. w.autoreloader = self.instantiate(w.autoreloader_cls,
  32. controller=w,
  33. logger=w.logger)
  34. return w.autoreloader
  35. def file_hash(filename, algorithm="md5"):
  36. hobj = hashlib.new(algorithm)
  37. with open(filename, "rb") as f:
  38. for chunk in iter(lambda: f.read(2 ** 20), ''):
  39. hobj.update(chunk)
  40. return hobj.digest()
  41. class BaseMonitor(object):
  42. def __init__(self, files, on_change=None, shutdown_event=None,
  43. interval=0.5):
  44. self.files = files
  45. self.interval = interval
  46. self._on_change = on_change
  47. self.modify_times = defaultdict(int)
  48. self.shutdown_event = shutdown_event or Event()
  49. def start(self):
  50. raise NotImplementedError("Subclass responsibility")
  51. def stop(self):
  52. pass
  53. def on_change(self, modified):
  54. if self._on_change:
  55. return self._on_change(modified)
  56. class StatMonitor(BaseMonitor):
  57. """File change monitor based on the ``stat`` system call."""
  58. def _mtimes(self):
  59. return ((f, self._mtime(f)) for f in self.files)
  60. def _maybe_modified(self, f, mt):
  61. return mt is not None and self.modify_times[f] != mt
  62. def start(self):
  63. while not self.shutdown_event.is_set():
  64. modified = dict((f, mt) for f, mt in self._mtimes()
  65. if self._maybe_modified(f, mt))
  66. if modified:
  67. self.on_change(modified.keys())
  68. self.modify_times.update(modified)
  69. time.sleep(self.interval)
  70. @staticmethod
  71. def _mtime(path):
  72. try:
  73. return os.stat(path).st_mtime
  74. except Exception:
  75. pass
  76. class KQueueMonitor(BaseMonitor):
  77. """File change monitor based on BSD kernel event notifications"""
  78. def __init__(self, *args, **kwargs):
  79. assert hasattr(select, "kqueue")
  80. super(KQueueMonitor, self).__init__(*args, **kwargs)
  81. self.filemap = dict((f, None) for f in self.files)
  82. def start(self):
  83. self._kq = select.kqueue()
  84. kevents = []
  85. for f in self.filemap:
  86. self.filemap[f] = fd = os.open(f, os.O_RDONLY)
  87. ev = select.kevent(fd,
  88. filter=select.KQ_FILTER_VNODE,
  89. flags=select.KQ_EV_ADD |
  90. select.KQ_EV_ENABLE |
  91. select.KQ_EV_CLEAR,
  92. fflags=select.KQ_NOTE_WRITE |
  93. select.KQ_NOTE_EXTEND)
  94. kevents.append(ev)
  95. events = self._kq.control(kevents, 0)
  96. while not self.shutdown_event.is_set():
  97. events = self._kq.control(kevents, 1)
  98. fds = [e.ident for e in events]
  99. modified = [k for k, v in self.filemap.iteritems()
  100. if v in fds]
  101. self.on_change(modified)
  102. def stop(self):
  103. self._kq.close()
  104. for fd in filter(None, self.filemap.values()):
  105. try:
  106. os.close(fd)
  107. except OSError, exc:
  108. if exc != errno.EBADF:
  109. raise
  110. self.filemap[fd] = None
  111. self.filemap.clear()
  112. class InotifyMonitor(_ProcessEvent):
  113. """File change monitor based on Linux kernel `inotify` subsystem"""
  114. def __init__(self, modules, on_change=None, **kwargs):
  115. assert pyinotify
  116. self._modules = modules
  117. self._on_change = on_change
  118. self._wm = None
  119. self._notifier = None
  120. def start(self):
  121. try:
  122. self._wm = pyinotify.WatchManager()
  123. self._notifier = pyinotify.Notifier(self._wm, self)
  124. add_watch = self._wm.add_watch
  125. for m in self._modules:
  126. add_watch(m, pyinotify.IN_MODIFY | pyinotify.IN_ATTRIB)
  127. self._notifier.loop()
  128. finally:
  129. if self._wm:
  130. self._wm.close()
  131. # Notifier.close is called at the end of Notifier.loop
  132. self._wm = self._notifier = None
  133. def stop(self):
  134. pass
  135. def process_(self, event):
  136. self.on_change([event.path])
  137. process_IN_ATTRIB = process_IN_MODIFY = process_
  138. def on_change(self, modified):
  139. if self._on_change:
  140. return self._on_change(modified)
  141. def default_implementation():
  142. # kqueue monitor not working properly at this time.
  143. if hasattr(select, "kqueue"):
  144. return "kqueue"
  145. if sys.platform.startswith("linux") and pyinotify:
  146. return "inotify"
  147. else:
  148. return "stat"
  149. implementations = {"kqueue": KQueueMonitor,
  150. "inotify": InotifyMonitor,
  151. "stat": StatMonitor}
  152. Monitor = implementations[
  153. os.environ.get("CELERYD_FSNOTIFY") or default_implementation()]
  154. class Autoreloader(bgThread):
  155. """Tracks changes in modules and fires reload commands"""
  156. Monitor = Monitor
  157. def __init__(self, controller, modules=None, monitor_cls=None,
  158. logger=None, **options):
  159. super(Autoreloader, self).__init__()
  160. self.controller = controller
  161. app = self.controller.app
  162. self.modules = app.loader.task_modules if modules is None else modules
  163. self.logger = logger
  164. self.options = options
  165. self.Monitor = monitor_cls or self.Monitor
  166. self._monitor = None
  167. self._hashes = None
  168. def body(self):
  169. files = [sys.modules[m].__file__ for m in self.modules]
  170. self._monitor = self.Monitor(files, self.on_change,
  171. shutdown_event=self._is_shutdown, **self.options)
  172. self._hashes = dict([(f, file_hash(f)) for f in files])
  173. try:
  174. self._monitor.start()
  175. except OSError, exc:
  176. if exc.errno not in (errno.EINTR, errno.EAGAIN):
  177. raise
  178. def _maybe_modified(self, f):
  179. digest = file_hash(f)
  180. if digest != self._hashes[f]:
  181. self._hashes[f] = digest
  182. return True
  183. return False
  184. def on_change(self, files):
  185. modified = [f for f in files if self._maybe_modified(f)]
  186. if modified:
  187. names = [self._module_name(module) for module in modified]
  188. self.logger.info("Detected modified modules: %r", names)
  189. self._reload(names)
  190. def _reload(self, modules):
  191. self.controller.reload(modules, reload=True)
  192. def stop(self):
  193. if self._monitor:
  194. self._monitor.stop()
  195. @staticmethod
  196. def _module_name(path):
  197. return os.path.splitext(os.path.basename(path))[0]