autoreload.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoreload
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements automatic module reloading
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import errno
  10. import hashlib
  11. import os
  12. import select
  13. import sys
  14. import time
  15. from collections import defaultdict
  16. from kombu.utils import eventio
  17. from celery.platforms import ignore_EBADF
  18. from celery.utils.imports import module_file
  19. from celery.utils.log import get_logger
  20. from celery.utils.threads import bgThread, Event
  21. from .abstract import StartStopComponent
  22. try: # pragma: no cover
  23. import pyinotify
  24. _ProcessEvent = pyinotify.ProcessEvent
  25. except ImportError: # pragma: no cover
  26. pyinotify = None # noqa
  27. _ProcessEvent = object # noqa
  28. logger = get_logger(__name__)
  29. class WorkerComponent(StartStopComponent):
  30. name = "worker.autoreloader"
  31. requires = ("pool", )
  32. def __init__(self, w, autoreload=None, **kwargs):
  33. self.enabled = w.autoreload = autoreload
  34. w.autoreloader = None
  35. def create_ev(self, w):
  36. ar = w.autoreloader = self.instantiate(w.autoreloader_cls, w)
  37. w.hub.on_init.append(ar.on_poll_init)
  38. w.hub.on_close.append(ar.on_poll_close)
  39. def create_threaded(self, w):
  40. w.autoreloader = self.instantiate(w.autoreloader_cls, w)
  41. return w.autoreloader
  42. def create(self, w):
  43. if hasattr(select, "kqueue") and w.use_eventloop:
  44. return self.create_ev(w)
  45. return self.create_threaded(w)
  46. def file_hash(filename, algorithm="md5"):
  47. hobj = hashlib.new(algorithm)
  48. with open(filename, "rb") as f:
  49. for chunk in iter(lambda: f.read(2 ** 20), ''):
  50. hobj.update(chunk)
  51. return hobj.digest()
  52. class BaseMonitor(object):
  53. def __init__(self, files, on_change=None, shutdown_event=None,
  54. interval=0.5):
  55. self.files = files
  56. self.interval = interval
  57. self._on_change = on_change
  58. self.modify_times = defaultdict(int)
  59. self.shutdown_event = shutdown_event or Event()
  60. def start(self):
  61. raise NotImplementedError("Subclass responsibility")
  62. def stop(self):
  63. pass
  64. def on_change(self, modified):
  65. if self._on_change:
  66. return self._on_change(modified)
  67. class StatMonitor(BaseMonitor):
  68. """File change monitor based on the ``stat`` system call."""
  69. def _mtimes(self):
  70. return ((f, self._mtime(f)) for f in self.files)
  71. def _maybe_modified(self, f, mt):
  72. return mt is not None and self.modify_times[f] != mt
  73. def start(self):
  74. while not self.shutdown_event.is_set():
  75. modified = dict((f, mt) for f, mt in self._mtimes()
  76. if self._maybe_modified(f, mt))
  77. if modified:
  78. self.on_change(modified.keys())
  79. self.modify_times.update(modified)
  80. time.sleep(self.interval)
  81. @staticmethod
  82. def _mtime(path):
  83. try:
  84. return os.stat(path).st_mtime
  85. except Exception:
  86. pass
  87. class KQueueMonitor(BaseMonitor):
  88. """File change monitor based on BSD kernel event notifications"""
  89. def __init__(self, *args, **kwargs):
  90. super(KQueueMonitor, self).__init__(*args, **kwargs)
  91. self.filemap = dict((f, None) for f in self.files)
  92. self.fdmap = {}
  93. def on_poll_init(self, hub):
  94. self.add_events(hub.poller)
  95. hub.poller.on_file_change = self.handle_event
  96. def on_poll_close(self, hub):
  97. self.close(hub.poller)
  98. def add_events(self, poller):
  99. for f in self.filemap:
  100. self.filemap[f] = fd = os.open(f, os.O_RDONLY)
  101. self.fdmap[fd] = f
  102. poller.watch_file(fd)
  103. def handle_event(self, events):
  104. self.on_change([self.fdmap[e.ident] for e in events])
  105. def start(self):
  106. self.poller = eventio.poll()
  107. self.add_events(self.poller)
  108. self.poller.on_file_change = self.handle_event
  109. while not self.shutdown_event.is_set():
  110. self.poller.poll(1)
  111. def close(self, poller):
  112. for f, fd in self.filemap.iteritems():
  113. if fd is not None:
  114. poller.unregister(fd)
  115. with ignore_EBADF(): # pragma: no cover
  116. os.close(fd)
  117. self.filemap.clear()
  118. self.fdmap.clear()
  119. def stop(self):
  120. self.close(self.poller)
  121. self.poller.close()
  122. class InotifyMonitor(_ProcessEvent):
  123. """File change monitor based on Linux kernel `inotify` subsystem"""
  124. def __init__(self, modules, on_change=None, **kwargs):
  125. assert pyinotify
  126. self._modules = modules
  127. self._on_change = on_change
  128. self._wm = None
  129. self._notifier = None
  130. def start(self):
  131. try:
  132. self._wm = pyinotify.WatchManager()
  133. self._notifier = pyinotify.Notifier(self._wm, self)
  134. add_watch = self._wm.add_watch
  135. flags = pyinotify.IN_MODIFY | pyinotify.IN_ATTRIB
  136. for m in self._modules:
  137. add_watch(m, flags)
  138. self._notifier.loop()
  139. finally:
  140. if self._wm:
  141. self._wm.close()
  142. # Notifier.close is called at the end of Notifier.loop
  143. self._wm = self._notifier = None
  144. def stop(self):
  145. pass
  146. def process_(self, event):
  147. self.on_change([event.path])
  148. process_IN_ATTRIB = process_IN_MODIFY = process_
  149. def on_change(self, modified):
  150. if self._on_change:
  151. return self._on_change(modified)
  152. def default_implementation():
  153. # kqueue monitor not working properly at this time.
  154. if hasattr(select, "kqueue"):
  155. return "kqueue"
  156. if sys.platform.startswith("linux") and pyinotify:
  157. return "inotify"
  158. else:
  159. return "stat"
  160. implementations = {"kqueue": KQueueMonitor,
  161. "inotify": InotifyMonitor,
  162. "stat": StatMonitor}
  163. Monitor = implementations[
  164. os.environ.get("CELERYD_FSNOTIFY") or default_implementation()]
  165. class Autoreloader(bgThread):
  166. """Tracks changes in modules and fires reload commands"""
  167. Monitor = Monitor
  168. def __init__(self, controller, modules=None, monitor_cls=None, **options):
  169. super(Autoreloader, self).__init__()
  170. self.controller = controller
  171. app = self.controller.app
  172. self.modules = app.loader.task_modules if modules is None else modules
  173. self.options = options
  174. self._monitor = None
  175. self._hashes = None
  176. def on_init(self):
  177. files = [module_file(sys.modules[m]) for m in self.modules]
  178. self._hashes = dict((f, file_hash(f)) for f in files)
  179. self._monitor = self.Monitor(files, self.on_change,
  180. shutdown_event=self._is_shutdown, **self.options)
  181. def on_poll_init(self, hub):
  182. if self._monitor is None:
  183. self.on_init()
  184. self._monitor.on_poll_init(hub)
  185. def on_poll_close(self, hub):
  186. if self._monitor is not None:
  187. self._monitor.on_poll_close(hub)
  188. def body(self):
  189. self.on_init()
  190. try:
  191. self._monitor.start()
  192. except OSError, exc:
  193. if exc.errno not in (errno.EINTR, errno.EAGAIN):
  194. raise
  195. def _maybe_modified(self, f):
  196. digest = file_hash(f)
  197. if digest != self._hashes[f]:
  198. self._hashes[f] = digest
  199. return True
  200. return False
  201. def on_change(self, files):
  202. modified = [f for f in files if self._maybe_modified(f)]
  203. if modified:
  204. names = [self._module_name(module) for module in modified]
  205. logger.info("Detected modified modules: %r", names)
  206. self._reload(names)
  207. def _reload(self, modules):
  208. self.controller.reload(modules, reload=True)
  209. def stop(self):
  210. if self._monitor:
  211. self._monitor.stop()
  212. @staticmethod
  213. def _module_name(path):
  214. return os.path.splitext(os.path.basename(path))[0]