autoreload.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoreload
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements automatic module reloading
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import os
  10. import sys
  11. import time
  12. import select
  13. import hashlib
  14. from collections import defaultdict
  15. from .. import current_app
  16. from ..abstract import StartStopComponent
  17. from ..utils.threads import bgThread, Event
  18. try:
  19. import pyinotify
  20. _ProcessEvent = pyinotify.ProcessEvent
  21. except ImportError:
  22. pyinotify = None # noqa
  23. _ProcessEvent = object # noqa
  24. class WorkerComponent(StartStopComponent):
  25. name = "worker.autoreloader"
  26. requires = ("pool", )
  27. def __init__(self, w, autoreload=None, **kwargs):
  28. self.enabled = w.autoreload = autoreload
  29. w.autoreloader = None
  30. def create(self, w):
  31. w.autoreloader = self.instantiate(w.autoreloader_cls,
  32. modules=w.autoreload,
  33. logger=w.logger)
  34. return w.autoreloader
  35. def file_hash(filename, algorithm="md5"):
  36. hobj = hashlib.new(algorithm)
  37. with open(filename, "rb") as f:
  38. for chunk in iter(lambda: f.read(2 ** 20), ''):
  39. hobj.update(chunk)
  40. return hobj.digest()
  41. class BaseMonitor(object):
  42. def __init__(self, files, on_change=None, shutdown_event=None,
  43. interval=0.5):
  44. self.files = files
  45. self.interval = interval
  46. self._on_change = on_change
  47. self.modify_times = defaultdict(int)
  48. self.shutdown_event = shutdown_event or Event()
  49. def start(self):
  50. raise NotImplementedError("Subclass responsibility")
  51. def stop(self):
  52. pass
  53. def on_change(self, modified):
  54. if self._on_change:
  55. return self._on_change(modified)
  56. class StatMonitor(BaseMonitor):
  57. """File change monitor based on the ``stat`` system call."""
  58. def start(self):
  59. while not self.shutdown_event.is_set():
  60. modified = {}
  61. for m in self.files:
  62. mt = self._mtime(m)
  63. if mt is None:
  64. break
  65. if self.modify_times[m] != mt:
  66. modified[m] = mt
  67. else:
  68. if modified:
  69. self.on_change(modified.keys())
  70. self.modify_times.update(modified)
  71. time.sleep(self.interval)
  72. @staticmethod
  73. def _mtime(path):
  74. try:
  75. return os.stat(path).st_mtime
  76. except Exception:
  77. pass
  78. class KQueueMonitor(BaseMonitor):
  79. """File change monitor based on BSD kernel event notifications"""
  80. def __init__(self, *args, **kwargs):
  81. assert hasattr(select, "kqueue")
  82. super(KQueueMonitor, self).__init__(*args, **kwargs)
  83. self.filemap = dict((f, None) for f in self.files)
  84. def start(self):
  85. self._kq = select.kqueue()
  86. kevents = []
  87. for f in self.filemap:
  88. self.filemap[f] = fd = os.open(f, os.O_RDONLY)
  89. ev = select.kevent(fd,
  90. filter=select.KQ_FILTER_VNODE,
  91. flags=select.KQ_EV_ADD |
  92. select.KQ_EV_ENABLE |
  93. select.KQ_EV_CLEAR,
  94. fflags=select.KQ_NOTE_WRITE |
  95. select.KQ_NOTE_EXTEND)
  96. kevents.append(ev)
  97. events = self._kq.control(kevents, 0)
  98. while not self.shutdown_event.is_set():
  99. events = self._kq.control(kevents, 1)
  100. fds = [e.ident for e in events]
  101. modified = [k for k, v in self.filemap.iteritems()
  102. if v in fds]
  103. self.on_change(modified)
  104. def stop(self):
  105. self._kq.close()
  106. for f in self._files:
  107. if self._files[f] is not None:
  108. os.close(self._files[f])
  109. self._files[f] = None
  110. class InotifyMonitor(_ProcessEvent):
  111. """File change monitor based on Linux kernel `inotify` subsystem"""
  112. def __init__(self, modules, on_change=None, **kwargs):
  113. assert pyinotify
  114. self._modules = modules
  115. self._on_change = on_change
  116. def start(self):
  117. try:
  118. self._wm = pyinotify.WatchManager()
  119. self._notifier = pyinotify.Notifier(self._wm)
  120. for m in self._modules:
  121. self._wm.add_watch(m, pyinotify.IN_MODIFY)
  122. self._notifier.loop()
  123. finally:
  124. self.close()
  125. def close(self):
  126. self._notifier.stop()
  127. self._wm.close()
  128. def process_IN_MODIFY(self, event):
  129. self.on_change(event.pathname)
  130. def on_change(self, modified):
  131. if self._on_change:
  132. return self._on_change(modified)
  133. if hasattr(select, "kqueue"):
  134. Monitor = KQueueMonitor
  135. elif sys.platform.startswith("linux") and pyinotify:
  136. Monitor = InotifyMonitor
  137. else:
  138. Monitor = StatMonitor
  139. class Autoreloader(bgThread):
  140. """Tracks changes in modules and fires reload commands"""
  141. Monitor = Monitor
  142. def __init__(self, modules, monitor_cls=None, logger=None, **kwargs):
  143. super(Autoreloader, self).__init__()
  144. self.daemon = True
  145. self.logger = logger
  146. files = [sys.modules[m].__file__ for m in modules]
  147. self.Monitor = monitor_cls or self.Monitor
  148. self._monitor = self.Monitor(files, self.on_change,
  149. shutdown_event=self._is_shutdown, **kwargs)
  150. self._hashes = dict([(f, file_hash(f)) for f in files])
  151. def body(self):
  152. self._monitor.start()
  153. def on_change(self, files):
  154. modified = []
  155. for f in files:
  156. fhash = file_hash(f)
  157. if fhash != self._hashes[f]:
  158. modified.append(f)
  159. self._hashes[f] = fhash
  160. if modified:
  161. self.logger.debug("Detected modified modules: %s" % (
  162. map(self._module_name, modified), ))
  163. self._reload(map(self._module_name, modified))
  164. def _reload(self, modules):
  165. current_app.control.broadcast("pool_restart",
  166. arguments={"imports": modules, "reload_modules": True})
  167. def stop(self):
  168. self._monitor.stop()
  169. @staticmethod
  170. def _module_name(path):
  171. return os.path.splitext(os.path.basename(path))[0]