autoreload.py 7.9 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoreload
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements automatic module reloading
  6. """
  7. from __future__ import absolute_import
  8. import errno
  9. import hashlib
  10. import os
  11. import select
  12. import sys
  13. import time
  14. from collections import defaultdict
  15. from kombu.utils import eventio
  16. from celery.platforms import ignore_EBADF
  17. from celery.utils.imports import module_file
  18. from celery.utils.log import get_logger
  19. from celery.utils.threads import bgThread, Event
  20. from .bootsteps import StartStopComponent
  21. try: # pragma: no cover
  22. import pyinotify
  23. _ProcessEvent = pyinotify.ProcessEvent
  24. except ImportError: # pragma: no cover
  25. pyinotify = None # noqa
  26. _ProcessEvent = object # noqa
  27. logger = get_logger(__name__)
  28. class WorkerComponent(StartStopComponent):
  29. name = 'worker.autoreloader'
  30. requires = ('pool', )
  31. def __init__(self, w, autoreload=None, **kwargs):
  32. self.enabled = w.autoreload = autoreload
  33. w.autoreloader = None
  34. def create_ev(self, w):
  35. ar = w.autoreloader = self.instantiate(w.autoreloader_cls, w)
  36. w.hub.on_init.append(ar.on_poll_init)
  37. w.hub.on_close.append(ar.on_poll_close)
  38. def create_threaded(self, w):
  39. w.autoreloader = self.instantiate(w.autoreloader_cls, w)
  40. return w.autoreloader
  41. def create(self, w):
  42. if hasattr(select, 'kqueue') and w.use_eventloop:
  43. return self.create_ev(w)
  44. return self.create_threaded(w)
  45. def file_hash(filename, algorithm='md5'):
  46. hobj = hashlib.new(algorithm)
  47. with open(filename, 'rb') as f:
  48. for chunk in iter(lambda: f.read(2 ** 20), ''):
  49. hobj.update(chunk)
  50. return hobj.digest()
  51. class BaseMonitor(object):
  52. def __init__(self, files, on_change=None, shutdown_event=None,
  53. interval=0.5):
  54. self.files = files
  55. self.interval = interval
  56. self._on_change = on_change
  57. self.modify_times = defaultdict(int)
  58. self.shutdown_event = shutdown_event or Event()
  59. def start(self):
  60. raise NotImplementedError('Subclass responsibility')
  61. def stop(self):
  62. pass
  63. def on_change(self, modified):
  64. if self._on_change:
  65. return self._on_change(modified)
  66. class StatMonitor(BaseMonitor):
  67. """File change monitor based on the ``stat`` system call."""
  68. def _mtimes(self):
  69. return ((f, self._mtime(f)) for f in self.files)
  70. def _maybe_modified(self, f, mt):
  71. return mt is not None and self.modify_times[f] != mt
  72. def start(self):
  73. while not self.shutdown_event.is_set():
  74. modified = dict((f, mt) for f, mt in self._mtimes()
  75. if self._maybe_modified(f, mt))
  76. if modified:
  77. self.on_change(modified.keys())
  78. self.modify_times.update(modified)
  79. time.sleep(self.interval)
  80. @staticmethod
  81. def _mtime(path):
  82. try:
  83. return os.stat(path).st_mtime
  84. except Exception:
  85. pass
  86. class KQueueMonitor(BaseMonitor):
  87. """File change monitor based on BSD kernel event notifications"""
  88. def __init__(self, *args, **kwargs):
  89. super(KQueueMonitor, self).__init__(*args, **kwargs)
  90. self.filemap = dict((f, None) for f in self.files)
  91. self.fdmap = {}
  92. def on_poll_init(self, hub):
  93. self.add_events(hub.poller)
  94. hub.poller.on_file_change = self.handle_event
  95. def on_poll_close(self, hub):
  96. self.close(hub.poller)
  97. def add_events(self, poller):
  98. for f in self.filemap:
  99. self.filemap[f] = fd = os.open(f, os.O_RDONLY)
  100. self.fdmap[fd] = f
  101. poller.watch_file(fd)
  102. def handle_event(self, events):
  103. self.on_change([self.fdmap[e.ident] for e in events])
  104. def start(self):
  105. self.poller = eventio.poll()
  106. self.add_events(self.poller)
  107. self.poller.on_file_change = self.handle_event
  108. while not self.shutdown_event.is_set():
  109. self.poller.poll(1)
  110. def close(self, poller):
  111. for f, fd in self.filemap.iteritems():
  112. if fd is not None:
  113. poller.unregister(fd)
  114. with ignore_EBADF(): # pragma: no cover
  115. os.close(fd)
  116. self.filemap.clear()
  117. self.fdmap.clear()
  118. def stop(self):
  119. self.close(self.poller)
  120. self.poller.close()
  121. class InotifyMonitor(_ProcessEvent):
  122. """File change monitor based on Linux kernel `inotify` subsystem"""
  123. def __init__(self, modules, on_change=None, **kwargs):
  124. assert pyinotify
  125. self._modules = modules
  126. self._on_change = on_change
  127. self._wm = None
  128. self._notifier = None
  129. def start(self):
  130. try:
  131. self._wm = pyinotify.WatchManager()
  132. self._notifier = pyinotify.Notifier(self._wm, self)
  133. add_watch = self._wm.add_watch
  134. flags = pyinotify.IN_MODIFY | pyinotify.IN_ATTRIB
  135. for m in self._modules:
  136. add_watch(m, flags)
  137. self._notifier.loop()
  138. finally:
  139. if self._wm:
  140. self._wm.close()
  141. # Notifier.close is called at the end of Notifier.loop
  142. self._wm = self._notifier = None
  143. def stop(self):
  144. pass
  145. def process_(self, event):
  146. self.on_change([event.path])
  147. process_IN_ATTRIB = process_IN_MODIFY = process_
  148. def on_change(self, modified):
  149. if self._on_change:
  150. return self._on_change(modified)
  151. def default_implementation():
  152. # kqueue monitor not working properly at this time.
  153. if hasattr(select, 'kqueue'):
  154. return 'kqueue'
  155. if sys.platform.startswith('linux') and pyinotify:
  156. return 'inotify'
  157. else:
  158. return 'stat'
  159. implementations = {'kqueue': KQueueMonitor,
  160. 'inotify': InotifyMonitor,
  161. 'stat': StatMonitor}
  162. Monitor = implementations[
  163. os.environ.get('CELERYD_FSNOTIFY') or default_implementation()]
  164. class Autoreloader(bgThread):
  165. """Tracks changes in modules and fires reload commands"""
  166. Monitor = Monitor
  167. def __init__(self, controller, modules=None, monitor_cls=None, **options):
  168. super(Autoreloader, self).__init__()
  169. self.controller = controller
  170. app = self.controller.app
  171. self.modules = app.loader.task_modules if modules is None else modules
  172. self.options = options
  173. self._monitor = None
  174. self._hashes = None
  175. self.file_to_module = {}
  176. def on_init(self):
  177. files = self.file_to_module
  178. files.update(dict((module_file(sys.modules[m]), m)
  179. for m in self.modules))
  180. self._monitor = self.Monitor(files.keys(), self.on_change,
  181. shutdown_event=self._is_shutdown, **self.options)
  182. self._hashes = dict([(f, file_hash(f)) for f in files])
  183. def on_poll_init(self, hub):
  184. if self._monitor is None:
  185. self.on_init()
  186. self._monitor.on_poll_init(hub)
  187. def on_poll_close(self, hub):
  188. if self._monitor is not None:
  189. self._monitor.on_poll_close(hub)
  190. def body(self):
  191. self.on_init()
  192. try:
  193. self._monitor.start()
  194. except OSError, exc:
  195. if exc.errno not in (errno.EINTR, errno.EAGAIN):
  196. raise
  197. def _maybe_modified(self, f):
  198. digest = file_hash(f)
  199. if digest != self._hashes[f]:
  200. self._hashes[f] = digest
  201. return True
  202. return False
  203. def on_change(self, files):
  204. modified = [f for f in files if self._maybe_modified(f)]
  205. if modified:
  206. names = [self.file_to_module[module] for module in modified]
  207. logger.info('Detected modified modules: %r', names)
  208. self._reload(names)
  209. def _reload(self, modules):
  210. self.controller.reload(modules, reload=True)
  211. def stop(self):
  212. if self._monitor:
  213. self._monitor.stop()